all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* Re: [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking
@ 2022-01-24 13:44 Hannes Laimer
  2022-01-31 14:44 ` Dominik Csapak
  0 siblings, 1 reply; 3+ messages in thread
From: Hannes Laimer @ 2022-01-24 13:44 UTC (permalink / raw)
  To: pbs-devel

Saves the currently active read/write operation counts in a file. The
file is updated whenever a reference returned by lookup_datastore is
dropped and whenever a reference is returned by lookup_datastore. The
files are locked before every access, there is one file per datastore.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
FIXUP: fix logic, previous version of this patch missed updates for 
PIDs that were not present in non empty list

 pbs-api-types/src/maintenance.rs   |   1 +
 pbs-datastore/Cargo.toml           |   1 +
 pbs-datastore/src/datastore.rs     | 100 ++++++++++++++++++++---------
 pbs-datastore/src/lib.rs           |   4 ++
 pbs-datastore/src/task_tracking.rs |  72 +++++++++++++++++++++
 src/bin/proxmox-backup-api.rs      |   1 +
 src/server/mod.rs                  |  16 ++++-
 7 files changed, 163 insertions(+), 32 deletions(-)
 create mode 100644 pbs-datastore/src/task_tracking.rs

diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
index 4d3ccb3b..1b5753d2 100644
--- a/pbs-api-types/src/maintenance.rs
+++ b/pbs-api-types/src/maintenance.rs
@@ -25,6 +25,7 @@ pub enum MaintenanceType {
     Offline(String),
 }
 
+#[derive(Copy ,Clone)]
 /// Operation requirments, used when checking for maintenance mode.
 pub enum Operation {
     Read,
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index d8cefa00..0c703d44 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -35,5 +35,6 @@ proxmox-uuid = "1"
 proxmox-sys = "0.2"
 
 pbs-api-types = { path = "../pbs-api-types" }
+pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-tools = { path = "../pbs-tools" }
 pbs-config = { path = "../pbs-config" }
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index c65d4574..774dea80 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -9,11 +9,12 @@ use std::time::Duration;
 use anyhow::{bail, format_err, Error};
 use lazy_static::lazy_static;
 
-use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions};
+use proxmox_sys::fs::{replace_file, file_read_optional_string,
+    CreateOptions, lock_dir_noblock, DirLockGuard,
+};
 use proxmox_sys::process_locker::ProcessLockSharedGuard;
 use proxmox_sys::WorkerTaskContext;
 use proxmox_sys::{task_log, task_warn};
-use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
 
 use pbs_api_types::{
     UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte
@@ -31,9 +32,10 @@ use crate::manifest::{
     ArchiveType, BackupManifest,
     archive_type,
 };
+use crate::task_tracking::update_active_operations;
 
 lazy_static! {
-    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
+    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
 }
 
 /// checks if auth_id is owner, or, if owner is a token, if
@@ -54,13 +56,42 @@ pub fn check_backup_owner(
 ///
 /// A Datastore can store severals backups, and provides the
 /// management interface for backup.
-pub struct DataStore {
+pub struct DataStoreImpl {
     chunk_store: Arc<ChunkStore>,
     gc_mutex: Mutex<()>,
     last_gc_status: Mutex<GarbageCollectionStatus>,
     verify_new: bool,
 }
 
+pub struct DataStore {
+    inner: Arc<DataStoreImpl>,
+    operation: Option<Operation>,
+}
+
+impl Clone for DataStore {
+    fn clone(&self) -> Self {
+        if let Some(operation) = self.operation.clone() {
+            if let Err(e) = update_active_operations(self.name(), operation, -1) {
+                eprintln!("could not update active operations - {}", e);
+            }
+        }
+        DataStore {
+            inner: self.inner.clone(),
+            operation: self.operation.clone(),
+        }
+    }
+}
+
+impl Drop for DataStore {
+    fn drop(&mut self) {
+        if let Some(operation) = self.operation.clone() {
+            if let Err(e) = update_active_operations(self.name(), operation, -1) {
+                eprintln!("could not update active operations - {}", e);
+            }
+        }
+    }
+}
+
 impl DataStore {
     pub fn lookup_datastore(
         name: &str,
@@ -75,6 +106,7 @@ impl DataStore {
             | (Some(MaintenanceType::Offline(message)), Some(_)) => {
                 bail!("Datastore '{}' is in maintenance mode: {}", name, message);
             },
+            (_, Some(operation)) => update_active_operations(name, operation, 1)?,
             _ => {}
         }
 
@@ -85,7 +117,10 @@ impl DataStore {
             if datastore.chunk_store.base() == path &&
                 datastore.verify_new == config.verify_new.unwrap_or(false)
             {
-                return Ok(datastore.clone());
+                return Ok(Arc::new(Self {
+                    inner: datastore.clone(),
+                    operation,
+                }))
             }
         }
 
@@ -94,7 +129,10 @@ impl DataStore {
         let datastore = Arc::new(datastore);
         map.insert(name.to_string(), datastore.clone());
 
-        Ok(datastore)
+        Ok(Arc::new(Self {
+            inner: datastore,
+            operation,
+        }))
     }
 
     /// removes all datastores that are not configured anymore
@@ -109,7 +147,7 @@ impl DataStore {
         Ok(())
     }
 
-    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
+    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
         let chunk_store = ChunkStore::open(store_name, path)?;
 
         let mut gc_status_path = chunk_store.base_path();
@@ -127,7 +165,7 @@ impl DataStore {
             GarbageCollectionStatus::default()
         };
 
-        Ok(Self {
+        Ok(DataStoreImpl {
             chunk_store: Arc::new(chunk_store),
             gc_mutex: Mutex::new(()),
             last_gc_status: Mutex::new(gc_status),
@@ -141,19 +179,19 @@ impl DataStore {
         impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
         Error
     > {
-        self.chunk_store.get_chunk_iterator()
+        self.inner.chunk_store.get_chunk_iterator()
     }
 
     pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
 
-        let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
+        let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
 
         Ok(index)
     }
 
     pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
 
-        let full_path =  self.chunk_store.relative_path(filename.as_ref());
+        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
 
         let index = FixedIndexReader::open(&full_path)?;
 
@@ -165,14 +203,14 @@ impl DataStore {
     ) -> Result<DynamicIndexWriter, Error> {
 
         let index = DynamicIndexWriter::create(
-            self.chunk_store.clone(), filename.as_ref())?;
+            self.inner.chunk_store.clone(), filename.as_ref())?;
 
         Ok(index)
     }
 
     pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
 
-        let full_path =  self.chunk_store.relative_path(filename.as_ref());
+        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
 
         let index = DynamicIndexReader::open(&full_path)?;
 
@@ -222,11 +260,11 @@ impl DataStore {
     }
 
     pub fn name(&self) -> &str {
-        self.chunk_store.name()
+        self.inner.chunk_store.name()
     }
 
     pub fn base_path(&self) -> PathBuf {
-        self.chunk_store.base_path()
+        self.inner.chunk_store.base_path()
     }
 
     /// Cleanup a backup directory
@@ -529,7 +567,7 @@ impl DataStore {
             worker.check_abort()?;
             worker.fail_on_shutdown()?;
             let digest = index.index_digest(pos).unwrap();
-            if !self.chunk_store.cond_touch_chunk(digest, false)? {
+            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
                 task_warn!(
                     worker,
                     "warning: unable to access non-existent chunk {}, required by {:?}",
@@ -545,7 +583,7 @@ impl DataStore {
                     let mut bad_path = PathBuf::new();
                     bad_path.push(self.chunk_path(digest).0);
                     bad_path.set_extension(bad_ext);
-                    self.chunk_store.cond_touch_path(&bad_path, false)?;
+                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
                 }
             }
         }
@@ -625,24 +663,24 @@ impl DataStore {
     }
 
     pub fn last_gc_status(&self) -> GarbageCollectionStatus {
-        self.last_gc_status.lock().unwrap().clone()
+        self.inner.last_gc_status.lock().unwrap().clone()
     }
 
     pub fn garbage_collection_running(&self) -> bool {
-        !matches!(self.gc_mutex.try_lock(), Ok(_))
+        !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
     }
 
     pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
 
-        if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
+        if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
 
             // avoids that we run GC if an old daemon process has still a
             // running backup writer, which is not save as we have no "oldest
             // writer" information and thus no safe atime cutoff
-            let _exclusive_lock =  self.chunk_store.try_exclusive_lock()?;
+            let _exclusive_lock =  self.inner.chunk_store.try_exclusive_lock()?;
 
             let phase1_start_time = proxmox_time::epoch_i64();
-            let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
+            let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
 
             let mut gc_status = GarbageCollectionStatus::default();
             gc_status.upid = Some(upid.to_string());
@@ -652,7 +690,7 @@ impl DataStore {
             self.mark_used_chunks(&mut gc_status, worker)?;
 
             task_log!(worker, "Start GC phase2 (sweep unused chunks)");
-            self.chunk_store.sweep_unused_chunks(
+            self.inner.chunk_store.sweep_unused_chunks(
                 oldest_writer,
                 phase1_start_time,
                 &mut gc_status,
@@ -729,7 +767,7 @@ impl DataStore {
                 let _ = replace_file(path, serialized.as_bytes(), options, false);
             }
 
-            *self.last_gc_status.lock().unwrap() = gc_status;
+            *self.inner.last_gc_status.lock().unwrap() = gc_status;
 
         } else {
             bail!("Start GC failed - (already running/locked)");
@@ -739,15 +777,15 @@ impl DataStore {
     }
 
     pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
-        self.chunk_store.try_shared_lock()
+        self.inner.chunk_store.try_shared_lock()
     }
 
     pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
-        self.chunk_store.chunk_path(digest)
+        self.inner.chunk_store.chunk_path(digest)
     }
 
     pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
-        self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
+        self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
     }
 
     pub fn insert_chunk(
@@ -755,7 +793,7 @@ impl DataStore {
         chunk: &DataBlob,
         digest: &[u8; 32],
     ) -> Result<(bool, u64), Error> {
-        self.chunk_store.insert_chunk(chunk, digest)
+        self.inner.chunk_store.insert_chunk(chunk, digest)
     }
 
     pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
@@ -771,13 +809,13 @@ impl DataStore {
 
 
     pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
-        let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
+        let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
         std::fs::metadata(chunk_path).map_err(Error::from)
     }
 
     pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
 
-        let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
+        let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
 
         proxmox_lang::try_block!({
             let mut file = std::fs::File::open(&chunk_path)?;
@@ -891,7 +929,7 @@ impl DataStore {
     }
 
     pub fn verify_new(&self) -> bool {
-        self.verify_new
+        self.inner.verify_new
     }
 
     /// returns a list of chunks sorted by their inode number on disk
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index d50a64a5..131040c6 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -145,6 +145,9 @@
 // Note: .pcat1 => Proxmox Catalog Format version 1
 pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
 
+/// Directory path where active operations counters are saved.
+pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
+
 #[macro_export]
 macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
     () => {
@@ -179,6 +182,7 @@ pub mod paperkey;
 pub mod prune;
 pub mod read_chunk;
 pub mod store_progress;
+pub mod task_tracking;
 
 pub mod dynamic_index;
 pub mod fixed_index;
diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs
new file mode 100644
index 00000000..608a3096
--- /dev/null
+++ b/pbs-datastore/src/task_tracking.rs
@@ -0,0 +1,72 @@
+use std::path::PathBuf;
+use anyhow::Error;
+use libc::pid_t;
+
+use proxmox_sys::fs::{CreateOptions, file_read_optional_string, open_file_locked, replace_file};
+use proxmox_sys::linux::procfs;
+use pbs_api_types::Operation;
+use serde::{Deserialize, Serialize};
+
+#[derive(Deserialize, Serialize, Clone)]
+struct TaskOperations {
+    pid: u32,
+    reading_operations: i64,
+    writing_operations: i64,
+}
+
+pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
+    let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
+    let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
+
+    let user = pbs_config::backup_user()?;
+    let options = CreateOptions::new()
+        .group(user.gid)
+        .owner(user.uid)
+        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+    let timeout = std::time::Duration::new(10, 0);
+    open_file_locked(&lock_path, timeout, true, options.clone())?;
+
+    let pid = std::process::id();
+    let mut updated = false;
+
+    let mut updated_tasks = match file_read_optional_string(&path)? {
+        Some(data) => {
+            let mut active_tasks: Vec<TaskOperations> = serde_json::from_str::<Vec<TaskOperations>>(&data)?
+                .iter()
+                .filter(|task|
+                    procfs::check_process_running(task.pid as pid_t).is_some())
+                .cloned()
+                .collect();
+
+            active_tasks.iter_mut()
+                .for_each(|task| {
+                    if task.pid == pid {
+                        updated = true;
+                        match operation {
+                            Operation::Read => task.reading_operations += count,
+                            Operation::Write => task.writing_operations += count,
+                        }
+                    }
+                });
+            active_tasks
+        }
+        None => Vec::new(),
+    };
+
+    if !updated {
+        updated_tasks.push(match operation {
+            Operation::Read => TaskOperations {
+                pid,
+                reading_operations: 1,
+                writing_operations: 0,
+            },
+            Operation::Write => TaskOperations {
+                pid,
+                reading_operations: 0,
+                writing_operations: 1,
+            }
+        })
+    }
+    replace_file(&path, serde_json::to_string(&updated_tasks)?.as_bytes(), options, false)
+}
\ No newline at end of file
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index e6fc5f23..445994dc 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::server::create_run_dir()?;
     proxmox_backup::server::create_state_dir()?;
+    proxmox_backup::server::create_active_operations_dir()?;
     proxmox_backup::server::jobstate::create_jobstate_dir()?;
     proxmox_backup::tape::create_tape_status_dir()?;
     proxmox_backup::tape::create_drive_state_dir()?;
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 2b54cdf0..5a9884e9 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,7 +4,7 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
-use anyhow::Error;
+use anyhow::{format_err, Error};
 use serde_json::Value;
 
 use proxmox_sys::fs::{create_path, CreateOptions};
@@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
     create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
     Ok(())
 }
+
+/// Create active operations dir with correct permission.
+pub fn create_active_operations_dir() -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
+    let options = CreateOptions::new()
+        .perm(mode)
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
+        .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
+    Ok(())
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking
  2022-01-24 13:44 [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking Hannes Laimer
@ 2022-01-31 14:44 ` Dominik Csapak
  0 siblings, 0 replies; 3+ messages in thread
From: Dominik Csapak @ 2022-01-31 14:44 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

looks mostly fine (one comment inline)

but did you intentionally ignore thomas comment about
the pid reusing and saving/checking of the starttime?

On 1/24/22 14:44, Hannes Laimer wrote:
> Saves the currently active read/write operation counts in a file. The
> file is updated whenever a reference returned by lookup_datastore is
> dropped and whenever a reference is returned by lookup_datastore. The
> files are locked before every access, there is one file per datastore.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
> FIXUP: fix logic, previous version of this patch missed updates for
> PIDs that were not present in non empty list
> 
>   pbs-api-types/src/maintenance.rs   |   1 +
>   pbs-datastore/Cargo.toml           |   1 +
>   pbs-datastore/src/datastore.rs     | 100 ++++++++++++++++++++---------
>   pbs-datastore/src/lib.rs           |   4 ++
>   pbs-datastore/src/task_tracking.rs |  72 +++++++++++++++++++++
>   src/bin/proxmox-backup-api.rs      |   1 +
>   src/server/mod.rs                  |  16 ++++-
>   7 files changed, 163 insertions(+), 32 deletions(-)
>   create mode 100644 pbs-datastore/src/task_tracking.rs
> 
> diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
> index 4d3ccb3b..1b5753d2 100644
> --- a/pbs-api-types/src/maintenance.rs
> +++ b/pbs-api-types/src/maintenance.rs
> @@ -25,6 +25,7 @@ pub enum MaintenanceType {
>       Offline(String),
>   }
>   
> +#[derive(Copy ,Clone)]
>   /// Operation requirments, used when checking for maintenance mode.
>   pub enum Operation {
>       Read,
> diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
> index d8cefa00..0c703d44 100644
> --- a/pbs-datastore/Cargo.toml
> +++ b/pbs-datastore/Cargo.toml
> @@ -35,5 +35,6 @@ proxmox-uuid = "1"
>   proxmox-sys = "0.2"
>   
>   pbs-api-types = { path = "../pbs-api-types" }
> +pbs-buildcfg = { path = "../pbs-buildcfg" }
>   pbs-tools = { path = "../pbs-tools" }
>   pbs-config = { path = "../pbs-config" }
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index c65d4574..774dea80 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -9,11 +9,12 @@ use std::time::Duration;
>   use anyhow::{bail, format_err, Error};
>   use lazy_static::lazy_static;
>   
> -use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions};
> +use proxmox_sys::fs::{replace_file, file_read_optional_string,
> +    CreateOptions, lock_dir_noblock, DirLockGuard,
> +};
>   use proxmox_sys::process_locker::ProcessLockSharedGuard;
>   use proxmox_sys::WorkerTaskContext;
>   use proxmox_sys::{task_log, task_warn};
> -use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
>   
>   use pbs_api_types::{
>       UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte
> @@ -31,9 +32,10 @@ use crate::manifest::{
>       ArchiveType, BackupManifest,
>       archive_type,
>   };
> +use crate::task_tracking::update_active_operations;
>   
>   lazy_static! {
> -    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
> +    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
>   }
>   
>   /// checks if auth_id is owner, or, if owner is a token, if
> @@ -54,13 +56,42 @@ pub fn check_backup_owner(
>   ///
>   /// A Datastore can store severals backups, and provides the
>   /// management interface for backup.
> -pub struct DataStore {
> +pub struct DataStoreImpl {
>       chunk_store: Arc<ChunkStore>,
>       gc_mutex: Mutex<()>,
>       last_gc_status: Mutex<GarbageCollectionStatus>,
>       verify_new: bool,
>   }
>   
> +pub struct DataStore {
> +    inner: Arc<DataStoreImpl>,
> +    operation: Option<Operation>,
> +}
> +
> +impl Clone for DataStore {
> +    fn clone(&self) -> Self {
> +        if let Some(operation) = self.operation.clone() {
> +            if let Err(e) = update_active_operations(self.name(), operation, -1) {
> +                eprintln!("could not update active operations - {}", e);
> +            }
> +        }
> +        DataStore {
> +            inner: self.inner.clone(),
> +            operation: self.operation.clone(),
> +        }
> +    }
> +}
> +
> +impl Drop for DataStore {
> +    fn drop(&mut self) {
> +        if let Some(operation) = self.operation.clone() {
> +            if let Err(e) = update_active_operations(self.name(), operation, -1) {
> +                eprintln!("could not update active operations - {}", e);
> +            }
> +        }
> +    }
> +}
> +
>   impl DataStore {
>       pub fn lookup_datastore(
>           name: &str,
> @@ -75,6 +106,7 @@ impl DataStore {
>               | (Some(MaintenanceType::Offline(message)), Some(_)) => {
>                   bail!("Datastore '{}' is in maintenance mode: {}", name, message);
>               },
> +            (_, Some(operation)) => update_active_operations(name, operation, 1)?,
>               _ => {}
>           }
>   
> @@ -85,7 +117,10 @@ impl DataStore {
>               if datastore.chunk_store.base() == path &&
>                   datastore.verify_new == config.verify_new.unwrap_or(false)
>               {
> -                return Ok(datastore.clone());
> +                return Ok(Arc::new(Self {
> +                    inner: datastore.clone(),
> +                    operation,
> +                }))
>               }
>           }
>   
> @@ -94,7 +129,10 @@ impl DataStore {
>           let datastore = Arc::new(datastore);
>           map.insert(name.to_string(), datastore.clone());
>   
> -        Ok(datastore)
> +        Ok(Arc::new(Self {
> +            inner: datastore,
> +            operation,
> +        }))
>       }
>   
>       /// removes all datastores that are not configured anymore
> @@ -109,7 +147,7 @@ impl DataStore {
>           Ok(())
>       }
>   
> -    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
> +    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
>           let chunk_store = ChunkStore::open(store_name, path)?;
>   
>           let mut gc_status_path = chunk_store.base_path();
> @@ -127,7 +165,7 @@ impl DataStore {
>               GarbageCollectionStatus::default()
>           };
>   
> -        Ok(Self {
> +        Ok(DataStoreImpl {
>               chunk_store: Arc::new(chunk_store),
>               gc_mutex: Mutex::new(()),
>               last_gc_status: Mutex::new(gc_status),
> @@ -141,19 +179,19 @@ impl DataStore {
>           impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
>           Error
>       > {
> -        self.chunk_store.get_chunk_iterator()
> +        self.inner.chunk_store.get_chunk_iterator()
>       }
>   
>       pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
>   
> -        let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
> +        let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
>   
>           Ok(index)
>       }
>   
>       pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
>   
> -        let full_path =  self.chunk_store.relative_path(filename.as_ref());
> +        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
>   
>           let index = FixedIndexReader::open(&full_path)?;
>   
> @@ -165,14 +203,14 @@ impl DataStore {
>       ) -> Result<DynamicIndexWriter, Error> {
>   
>           let index = DynamicIndexWriter::create(
> -            self.chunk_store.clone(), filename.as_ref())?;
> +            self.inner.chunk_store.clone(), filename.as_ref())?;
>   
>           Ok(index)
>       }
>   
>       pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
>   
> -        let full_path =  self.chunk_store.relative_path(filename.as_ref());
> +        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
>   
>           let index = DynamicIndexReader::open(&full_path)?;
>   
> @@ -222,11 +260,11 @@ impl DataStore {
>       }
>   
>       pub fn name(&self) -> &str {
> -        self.chunk_store.name()
> +        self.inner.chunk_store.name()
>       }
>   
>       pub fn base_path(&self) -> PathBuf {
> -        self.chunk_store.base_path()
> +        self.inner.chunk_store.base_path()
>       }
>   
>       /// Cleanup a backup directory
> @@ -529,7 +567,7 @@ impl DataStore {
>               worker.check_abort()?;
>               worker.fail_on_shutdown()?;
>               let digest = index.index_digest(pos).unwrap();
> -            if !self.chunk_store.cond_touch_chunk(digest, false)? {
> +            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
>                   task_warn!(
>                       worker,
>                       "warning: unable to access non-existent chunk {}, required by {:?}",
> @@ -545,7 +583,7 @@ impl DataStore {
>                       let mut bad_path = PathBuf::new();
>                       bad_path.push(self.chunk_path(digest).0);
>                       bad_path.set_extension(bad_ext);
> -                    self.chunk_store.cond_touch_path(&bad_path, false)?;
> +                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
>                   }
>               }
>           }
> @@ -625,24 +663,24 @@ impl DataStore {
>       }
>   
>       pub fn last_gc_status(&self) -> GarbageCollectionStatus {
> -        self.last_gc_status.lock().unwrap().clone()
> +        self.inner.last_gc_status.lock().unwrap().clone()
>       }
>   
>       pub fn garbage_collection_running(&self) -> bool {
> -        !matches!(self.gc_mutex.try_lock(), Ok(_))
> +        !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
>       }
>   
>       pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
>   
> -        if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
> +        if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
>   
>               // avoids that we run GC if an old daemon process has still a
>               // running backup writer, which is not save as we have no "oldest
>               // writer" information and thus no safe atime cutoff
> -            let _exclusive_lock =  self.chunk_store.try_exclusive_lock()?;
> +            let _exclusive_lock =  self.inner.chunk_store.try_exclusive_lock()?;
>   
>               let phase1_start_time = proxmox_time::epoch_i64();
> -            let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
> +            let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
>   
>               let mut gc_status = GarbageCollectionStatus::default();
>               gc_status.upid = Some(upid.to_string());
> @@ -652,7 +690,7 @@ impl DataStore {
>               self.mark_used_chunks(&mut gc_status, worker)?;
>   
>               task_log!(worker, "Start GC phase2 (sweep unused chunks)");
> -            self.chunk_store.sweep_unused_chunks(
> +            self.inner.chunk_store.sweep_unused_chunks(
>                   oldest_writer,
>                   phase1_start_time,
>                   &mut gc_status,
> @@ -729,7 +767,7 @@ impl DataStore {
>                   let _ = replace_file(path, serialized.as_bytes(), options, false);
>               }
>   
> -            *self.last_gc_status.lock().unwrap() = gc_status;
> +            *self.inner.last_gc_status.lock().unwrap() = gc_status;
>   
>           } else {
>               bail!("Start GC failed - (already running/locked)");
> @@ -739,15 +777,15 @@ impl DataStore {
>       }
>   
>       pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
> -        self.chunk_store.try_shared_lock()
> +        self.inner.chunk_store.try_shared_lock()
>       }
>   
>       pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
> -        self.chunk_store.chunk_path(digest)
> +        self.inner.chunk_store.chunk_path(digest)
>       }
>   
>       pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
> -        self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
> +        self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
>       }
>   
>       pub fn insert_chunk(
> @@ -755,7 +793,7 @@ impl DataStore {
>           chunk: &DataBlob,
>           digest: &[u8; 32],
>       ) -> Result<(bool, u64), Error> {
> -        self.chunk_store.insert_chunk(chunk, digest)
> +        self.inner.chunk_store.insert_chunk(chunk, digest)
>       }
>   
>       pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
> @@ -771,13 +809,13 @@ impl DataStore {
>   
>   
>       pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
> -        let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
> +        let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
>           std::fs::metadata(chunk_path).map_err(Error::from)
>       }
>   
>       pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
>   
> -        let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
> +        let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
>   
>           proxmox_lang::try_block!({
>               let mut file = std::fs::File::open(&chunk_path)?;
> @@ -891,7 +929,7 @@ impl DataStore {
>       }
>   
>       pub fn verify_new(&self) -> bool {
> -        self.verify_new
> +        self.inner.verify_new
>       }
>   
>       /// returns a list of chunks sorted by their inode number on disk
> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
> index d50a64a5..131040c6 100644
> --- a/pbs-datastore/src/lib.rs
> +++ b/pbs-datastore/src/lib.rs
> @@ -145,6 +145,9 @@
>   // Note: .pcat1 => Proxmox Catalog Format version 1
>   pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
>   
> +/// Directory path where active operations counters are saved.
> +pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
> +
>   #[macro_export]
>   macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
>       () => {
> @@ -179,6 +182,7 @@ pub mod paperkey;
>   pub mod prune;
>   pub mod read_chunk;
>   pub mod store_progress;
> +pub mod task_tracking;
>   
>   pub mod dynamic_index;
>   pub mod fixed_index;
> diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs
> new file mode 100644
> index 00000000..608a3096
> --- /dev/null
> +++ b/pbs-datastore/src/task_tracking.rs
> @@ -0,0 +1,72 @@
> +use std::path::PathBuf;
> +use anyhow::Error;
> +use libc::pid_t;
> +
> +use proxmox_sys::fs::{CreateOptions, file_read_optional_string, open_file_locked, replace_file};
> +use proxmox_sys::linux::procfs;
> +use pbs_api_types::Operation;
> +use serde::{Deserialize, Serialize};
> +
> +#[derive(Deserialize, Serialize, Clone)]
> +struct TaskOperations {
> +    pid: u32,
> +    reading_operations: i64,
> +    writing_operations: i64,
> +}
> +
> +pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
> +    let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
> +    let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
> +
> +    let user = pbs_config::backup_user()?;
> +    let options = CreateOptions::new()
> +        .group(user.gid)
> +        .owner(user.uid)
> +        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +    let timeout = std::time::Duration::new(10, 0);
> +    open_file_locked(&lock_path, timeout, true, options.clone())?;
> +
> +    let pid = std::process::id();
> +    let mut updated = false;
> +
> +    let mut updated_tasks = match file_read_optional_string(&path)? {
> +        Some(data) => {
> +            let mut active_tasks: Vec<TaskOperations> = serde_json::from_str::<Vec<TaskOperations>>(&data)?
> +                .iter()
> +                .filter(|task|
> +                    procfs::check_process_running(task.pid as pid_t).is_some())
> +                .cloned()
> +                .collect();
> +
> +            active_tasks.iter_mut()
> +                .for_each(|task| {
> +                    if task.pid == pid {
> +                        updated = true;
> +                        match operation {
> +                            Operation::Read => task.reading_operations += count,
> +                            Operation::Write => task.writing_operations += count,
> +                        }
> +                    }
> +                });
> +            active_tasks

would it not be possible to have that in the filter function too?
or by using maybe 'filter_map' ?

alternatively you could have a simple for loop which pushes
only the correct results into a vec (by using 'into_iter()')

in any case, i think we can avoid the double iteration here

> +        }
> +        None => Vec::new(),
> +    };
> +
> +    if !updated {
> +        updated_tasks.push(match operation {
> +            Operation::Read => TaskOperations {
> +                pid,
> +                reading_operations: 1,
> +                writing_operations: 0,
> +            },
> +            Operation::Write => TaskOperations {
> +                pid,
> +                reading_operations: 0,
> +                writing_operations: 1,
> +            }
> +        })
> +    }
> +    replace_file(&path, serde_json::to_string(&updated_tasks)?.as_bytes(), options, false)
> +}
> \ No newline at end of file
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index e6fc5f23..445994dc 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
>   
>       proxmox_backup::server::create_run_dir()?;
>       proxmox_backup::server::create_state_dir()?;
> +    proxmox_backup::server::create_active_operations_dir()?;
>       proxmox_backup::server::jobstate::create_jobstate_dir()?;
>       proxmox_backup::tape::create_tape_status_dir()?;
>       proxmox_backup::tape::create_drive_state_dir()?;
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index 2b54cdf0..5a9884e9 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -4,7 +4,7 @@
>   //! services. We want async IO, so this is built on top of
>   //! tokio/hyper.
>   
> -use anyhow::Error;
> +use anyhow::{format_err, Error};
>   use serde_json::Value;
>   
>   use proxmox_sys::fs::{create_path, CreateOptions};
> @@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
>       create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
>       Ok(())
>   }
> +
> +/// Create active operations dir with correct permission.
> +pub fn create_active_operations_dir() -> Result<(), Error> {
> +    let backup_user = pbs_config::backup_user()?;
> +    let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
> +    let options = CreateOptions::new()
> +        .perm(mode)
> +        .owner(backup_user.uid)
> +        .group(backup_user.gid);
> +
> +    create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
> +        .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
> +    Ok(())
> +}





^ permalink raw reply	[flat|nested] 3+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking
  2022-01-24 12:31 [pbs-devel] [PATCH proxmox-backup v5 0/8] closes #3071: maintenance mode for datastore Hannes Laimer
@ 2022-01-24 12:31 ` Hannes Laimer
  0 siblings, 0 replies; 3+ messages in thread
From: Hannes Laimer @ 2022-01-24 12:31 UTC (permalink / raw)
  To: pbs-devel

Saves the currently active read/write operation counts in a file. The
file is updated whenever a reference returned by lookup_datastore is
dropped and whenever a reference is returned by lookup_datastore. The
files are locked before every access, there is one file per datastore.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-api-types/src/maintenance.rs   |   1 +
 pbs-datastore/Cargo.toml           |   1 +
 pbs-datastore/src/datastore.rs     | 100 ++++++++++++++++++++---------
 pbs-datastore/src/lib.rs           |   4 ++
 pbs-datastore/src/task_tracking.rs |  62 ++++++++++++++++++
 src/bin/proxmox-backup-api.rs      |   1 +
 src/server/mod.rs                  |  16 ++++-
 7 files changed, 153 insertions(+), 32 deletions(-)
 create mode 100644 pbs-datastore/src/task_tracking.rs

diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
index 4d3ccb3b..1b5753d2 100644
--- a/pbs-api-types/src/maintenance.rs
+++ b/pbs-api-types/src/maintenance.rs
@@ -25,6 +25,7 @@ pub enum MaintenanceType {
     Offline(String),
 }
 
+#[derive(Copy ,Clone)]
 /// Operation requirments, used when checking for maintenance mode.
 pub enum Operation {
     Read,
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index d8cefa00..0c703d44 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -35,5 +35,6 @@ proxmox-uuid = "1"
 proxmox-sys = "0.2"
 
 pbs-api-types = { path = "../pbs-api-types" }
+pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-tools = { path = "../pbs-tools" }
 pbs-config = { path = "../pbs-config" }
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index c65d4574..774dea80 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -9,11 +9,12 @@ use std::time::Duration;
 use anyhow::{bail, format_err, Error};
 use lazy_static::lazy_static;
 
-use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions};
+use proxmox_sys::fs::{replace_file, file_read_optional_string,
+    CreateOptions, lock_dir_noblock, DirLockGuard,
+};
 use proxmox_sys::process_locker::ProcessLockSharedGuard;
 use proxmox_sys::WorkerTaskContext;
 use proxmox_sys::{task_log, task_warn};
-use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
 
 use pbs_api_types::{
     UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte
@@ -31,9 +32,10 @@ use crate::manifest::{
     ArchiveType, BackupManifest,
     archive_type,
 };
+use crate::task_tracking::update_active_operations;
 
 lazy_static! {
-    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
+    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
 }
 
 /// checks if auth_id is owner, or, if owner is a token, if
@@ -54,13 +56,42 @@ pub fn check_backup_owner(
 ///
 /// A Datastore can store severals backups, and provides the
 /// management interface for backup.
-pub struct DataStore {
+pub struct DataStoreImpl {
     chunk_store: Arc<ChunkStore>,
     gc_mutex: Mutex<()>,
     last_gc_status: Mutex<GarbageCollectionStatus>,
     verify_new: bool,
 }
 
+pub struct DataStore {
+    inner: Arc<DataStoreImpl>,
+    operation: Option<Operation>,
+}
+
+impl Clone for DataStore {
+    fn clone(&self) -> Self {
+        if let Some(operation) = self.operation.clone() {
+            if let Err(e) = update_active_operations(self.name(), operation, -1) {
+                eprintln!("could not update active operations - {}", e);
+            }
+        }
+        DataStore {
+            inner: self.inner.clone(),
+            operation: self.operation.clone(),
+        }
+    }
+}
+
+impl Drop for DataStore {
+    fn drop(&mut self) {
+        if let Some(operation) = self.operation.clone() {
+            if let Err(e) = update_active_operations(self.name(), operation, -1) {
+                eprintln!("could not update active operations - {}", e);
+            }
+        }
+    }
+}
+
 impl DataStore {
     pub fn lookup_datastore(
         name: &str,
@@ -75,6 +106,7 @@ impl DataStore {
             | (Some(MaintenanceType::Offline(message)), Some(_)) => {
                 bail!("Datastore '{}' is in maintenance mode: {}", name, message);
             },
+            (_, Some(operation)) => update_active_operations(name, operation, 1)?,
             _ => {}
         }
 
@@ -85,7 +117,10 @@ impl DataStore {
             if datastore.chunk_store.base() == path &&
                 datastore.verify_new == config.verify_new.unwrap_or(false)
             {
-                return Ok(datastore.clone());
+                return Ok(Arc::new(Self {
+                    inner: datastore.clone(),
+                    operation,
+                }))
             }
         }
 
@@ -94,7 +129,10 @@ impl DataStore {
         let datastore = Arc::new(datastore);
         map.insert(name.to_string(), datastore.clone());
 
-        Ok(datastore)
+        Ok(Arc::new(Self {
+            inner: datastore,
+            operation,
+        }))
     }
 
     /// removes all datastores that are not configured anymore
@@ -109,7 +147,7 @@ impl DataStore {
         Ok(())
     }
 
-    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
+    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
         let chunk_store = ChunkStore::open(store_name, path)?;
 
         let mut gc_status_path = chunk_store.base_path();
@@ -127,7 +165,7 @@ impl DataStore {
             GarbageCollectionStatus::default()
         };
 
-        Ok(Self {
+        Ok(DataStoreImpl {
             chunk_store: Arc::new(chunk_store),
             gc_mutex: Mutex::new(()),
             last_gc_status: Mutex::new(gc_status),
@@ -141,19 +179,19 @@ impl DataStore {
         impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
         Error
     > {
-        self.chunk_store.get_chunk_iterator()
+        self.inner.chunk_store.get_chunk_iterator()
     }
 
     pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
 
-        let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
+        let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
 
         Ok(index)
     }
 
     pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
 
-        let full_path =  self.chunk_store.relative_path(filename.as_ref());
+        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
 
         let index = FixedIndexReader::open(&full_path)?;
 
@@ -165,14 +203,14 @@ impl DataStore {
     ) -> Result<DynamicIndexWriter, Error> {
 
         let index = DynamicIndexWriter::create(
-            self.chunk_store.clone(), filename.as_ref())?;
+            self.inner.chunk_store.clone(), filename.as_ref())?;
 
         Ok(index)
     }
 
     pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
 
-        let full_path =  self.chunk_store.relative_path(filename.as_ref());
+        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
 
         let index = DynamicIndexReader::open(&full_path)?;
 
@@ -222,11 +260,11 @@ impl DataStore {
     }
 
     pub fn name(&self) -> &str {
-        self.chunk_store.name()
+        self.inner.chunk_store.name()
     }
 
     pub fn base_path(&self) -> PathBuf {
-        self.chunk_store.base_path()
+        self.inner.chunk_store.base_path()
     }
 
     /// Cleanup a backup directory
@@ -529,7 +567,7 @@ impl DataStore {
             worker.check_abort()?;
             worker.fail_on_shutdown()?;
             let digest = index.index_digest(pos).unwrap();
-            if !self.chunk_store.cond_touch_chunk(digest, false)? {
+            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
                 task_warn!(
                     worker,
                     "warning: unable to access non-existent chunk {}, required by {:?}",
@@ -545,7 +583,7 @@ impl DataStore {
                     let mut bad_path = PathBuf::new();
                     bad_path.push(self.chunk_path(digest).0);
                     bad_path.set_extension(bad_ext);
-                    self.chunk_store.cond_touch_path(&bad_path, false)?;
+                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
                 }
             }
         }
@@ -625,24 +663,24 @@ impl DataStore {
     }
 
     pub fn last_gc_status(&self) -> GarbageCollectionStatus {
-        self.last_gc_status.lock().unwrap().clone()
+        self.inner.last_gc_status.lock().unwrap().clone()
     }
 
     pub fn garbage_collection_running(&self) -> bool {
-        !matches!(self.gc_mutex.try_lock(), Ok(_))
+        !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
     }
 
     pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
 
-        if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
+        if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
 
             // avoids that we run GC if an old daemon process has still a
             // running backup writer, which is not save as we have no "oldest
             // writer" information and thus no safe atime cutoff
-            let _exclusive_lock =  self.chunk_store.try_exclusive_lock()?;
+            let _exclusive_lock =  self.inner.chunk_store.try_exclusive_lock()?;
 
             let phase1_start_time = proxmox_time::epoch_i64();
-            let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
+            let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
 
             let mut gc_status = GarbageCollectionStatus::default();
             gc_status.upid = Some(upid.to_string());
@@ -652,7 +690,7 @@ impl DataStore {
             self.mark_used_chunks(&mut gc_status, worker)?;
 
             task_log!(worker, "Start GC phase2 (sweep unused chunks)");
-            self.chunk_store.sweep_unused_chunks(
+            self.inner.chunk_store.sweep_unused_chunks(
                 oldest_writer,
                 phase1_start_time,
                 &mut gc_status,
@@ -729,7 +767,7 @@ impl DataStore {
                 let _ = replace_file(path, serialized.as_bytes(), options, false);
             }
 
-            *self.last_gc_status.lock().unwrap() = gc_status;
+            *self.inner.last_gc_status.lock().unwrap() = gc_status;
 
         } else {
             bail!("Start GC failed - (already running/locked)");
@@ -739,15 +777,15 @@ impl DataStore {
     }
 
     pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
-        self.chunk_store.try_shared_lock()
+        self.inner.chunk_store.try_shared_lock()
     }
 
     pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
-        self.chunk_store.chunk_path(digest)
+        self.inner.chunk_store.chunk_path(digest)
     }
 
     pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
-        self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
+        self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
     }
 
     pub fn insert_chunk(
@@ -755,7 +793,7 @@ impl DataStore {
         chunk: &DataBlob,
         digest: &[u8; 32],
     ) -> Result<(bool, u64), Error> {
-        self.chunk_store.insert_chunk(chunk, digest)
+        self.inner.chunk_store.insert_chunk(chunk, digest)
     }
 
     pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
@@ -771,13 +809,13 @@ impl DataStore {
 
 
     pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
-        let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
+        let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
         std::fs::metadata(chunk_path).map_err(Error::from)
     }
 
     pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
 
-        let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
+        let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
 
         proxmox_lang::try_block!({
             let mut file = std::fs::File::open(&chunk_path)?;
@@ -891,7 +929,7 @@ impl DataStore {
     }
 
     pub fn verify_new(&self) -> bool {
-        self.verify_new
+        self.inner.verify_new
     }
 
     /// returns a list of chunks sorted by their inode number on disk
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index d50a64a5..131040c6 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -145,6 +145,9 @@
 // Note: .pcat1 => Proxmox Catalog Format version 1
 pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
 
+/// Directory path where active operations counters are saved.
+pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
+
 #[macro_export]
 macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
     () => {
@@ -179,6 +182,7 @@ pub mod paperkey;
 pub mod prune;
 pub mod read_chunk;
 pub mod store_progress;
+pub mod task_tracking;
 
 pub mod dynamic_index;
 pub mod fixed_index;
diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs
new file mode 100644
index 00000000..06266e55
--- /dev/null
+++ b/pbs-datastore/src/task_tracking.rs
@@ -0,0 +1,62 @@
+use std::path::PathBuf;
+use anyhow::Error;
+use libc::pid_t;
+
+use proxmox_sys::fs::{CreateOptions, file_read_optional_string, open_file_locked, replace_file};
+use proxmox_sys::linux::procfs;
+use pbs_api_types::Operation;
+use serde::{Deserialize, Serialize};
+
+#[derive(Deserialize, Serialize)]
+struct TaskOperations {
+    pid: u32,
+    reading_operations: i64,
+    writing_operations: i64,
+}
+
+pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
+    let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
+    let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
+
+    let user = pbs_config::backup_user()?;
+    let options = CreateOptions::new()
+        .group(user.gid)
+        .owner(user.uid)
+        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+    let timeout = std::time::Duration::new(10, 0);
+    open_file_locked(&lock_path, timeout, true, options.clone())?;
+
+    let pid = std::process::id();
+    let updated = match file_read_optional_string(&path)? {
+        Some(data) => {
+            let mut active_tasks: Vec<TaskOperations> = serde_json::from_str(&data)?;
+            for task in active_tasks
+                .iter_mut()
+                .filter(|task|
+                    procfs::check_process_running(task.pid as pid_t).is_some()) {
+                if task.pid == pid {
+                    match operation {
+                        Operation::Read => task.reading_operations += count,
+                        Operation::Write => task.writing_operations += count,
+                    }
+                }
+            }
+            active_tasks
+        }
+        None => vec!(match operation {
+            Operation::Read => TaskOperations {
+                pid,
+                reading_operations: 1,
+                writing_operations: 0,
+            },
+            Operation::Write => TaskOperations {
+                pid,
+                reading_operations: 0,
+                writing_operations: 1,
+            }
+        }),
+    };
+
+    replace_file(&path, serde_json::to_string(&updated)?.as_bytes(), options, false)
+}
\ No newline at end of file
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index e6fc5f23..445994dc 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::server::create_run_dir()?;
     proxmox_backup::server::create_state_dir()?;
+    proxmox_backup::server::create_active_operations_dir()?;
     proxmox_backup::server::jobstate::create_jobstate_dir()?;
     proxmox_backup::tape::create_tape_status_dir()?;
     proxmox_backup::tape::create_drive_state_dir()?;
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 2b54cdf0..5a9884e9 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,7 +4,7 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
-use anyhow::Error;
+use anyhow::{format_err, Error};
 use serde_json::Value;
 
 use proxmox_sys::fs::{create_path, CreateOptions};
@@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
     create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
     Ok(())
 }
+
+/// Create active operations dir with correct permission.
+pub fn create_active_operations_dir() -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
+    let options = CreateOptions::new()
+        .perm(mode)
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
+        .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
+    Ok(())
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2022-01-31 14:45 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-01-24 13:44 [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking Hannes Laimer
2022-01-31 14:44 ` Dominik Csapak
  -- strict thread matches above, loose matches on Subject: below --
2022-01-24 12:31 [pbs-devel] [PATCH proxmox-backup v5 0/8] closes #3071: maintenance mode for datastore Hannes Laimer
2022-01-24 12:31 ` [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking Hannes Laimer

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal