From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v7 3/6] pbs-datastore: add active operations tracking
Date: Fri, 4 Feb 2022 11:17:26 +0000 [thread overview]
Message-ID: <20220204111729.22107-4-h.laimer@proxmox.com> (raw)
In-Reply-To: <20220204111729.22107-1-h.laimer@proxmox.com>
Saves the currently active read/write operation counts in a file. The
file is updated whenever a reference returned by lookup_datastore is
dropped and whenever a reference is returned by lookup_datastore. The
files are locked before every access, there is one file per datastore.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-api-types/src/maintenance.rs | 1 +
pbs-datastore/Cargo.toml | 1 +
pbs-datastore/src/datastore.rs | 114 +++++++++++++++++++----------
pbs-datastore/src/lib.rs | 4 +
pbs-datastore/src/task_tracking.rs | 81 ++++++++++++++++++++
src/bin/proxmox-backup-api.rs | 1 +
src/server/mod.rs | 16 +++-
7 files changed, 180 insertions(+), 38 deletions(-)
create mode 100644 pbs-datastore/src/task_tracking.rs
diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
index 4d3ccb3b..1b5753d2 100644
--- a/pbs-api-types/src/maintenance.rs
+++ b/pbs-api-types/src/maintenance.rs
@@ -25,6 +25,7 @@ pub enum MaintenanceType {
Offline(String),
}
+#[derive(Copy ,Clone)]
/// Operation requirments, used when checking for maintenance mode.
pub enum Operation {
Read,
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index d8cefa00..0c703d44 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -35,5 +35,6 @@ proxmox-uuid = "1"
proxmox-sys = "0.2"
pbs-api-types = { path = "../pbs-api-types" }
+pbs-buildcfg = { path = "../pbs-buildcfg" }
pbs-tools = { path = "../pbs-tools" }
pbs-config = { path = "../pbs-config" }
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index c65d4574..47a52084 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -9,11 +9,12 @@ use std::time::Duration;
use anyhow::{bail, format_err, Error};
use lazy_static::lazy_static;
-use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions};
+use proxmox_sys::fs::{replace_file, file_read_optional_string,
+ CreateOptions, lock_dir_noblock, DirLockGuard,
+};
use proxmox_sys::process_locker::ProcessLockSharedGuard;
use proxmox_sys::WorkerTaskContext;
use proxmox_sys::{task_log, task_warn};
-use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
use pbs_api_types::{
UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte
@@ -31,9 +32,10 @@ use crate::manifest::{
ArchiveType, BackupManifest,
archive_type,
};
+use crate::task_tracking::update_active_operations;
lazy_static! {
- static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
+ static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
}
/// checks if auth_id is owner, or, if owner is a token, if
@@ -54,13 +56,42 @@ pub fn check_backup_owner(
///
/// A Datastore can store severals backups, and provides the
/// management interface for backup.
-pub struct DataStore {
+pub struct DataStoreImpl {
chunk_store: Arc<ChunkStore>,
gc_mutex: Mutex<()>,
last_gc_status: Mutex<GarbageCollectionStatus>,
verify_new: bool,
}
+pub struct DataStore {
+ inner: Arc<DataStoreImpl>,
+ operation: Option<Operation>,
+}
+
+impl Clone for DataStore {
+ fn clone(&self) -> Self {
+ if let Some(operation) = self.operation.clone() {
+ if let Err(e) = update_active_operations(self.name(), operation, 1) {
+ eprintln!("could not update active operations - {}", e);
+ }
+ }
+ DataStore {
+ inner: self.inner.clone(),
+ operation: self.operation.clone(),
+ }
+ }
+}
+
+impl Drop for DataStore {
+ fn drop(&mut self) {
+ if let Some(operation) = self.operation.clone() {
+ if let Err(e) = update_active_operations(self.name(), operation, -1) {
+ eprintln!("could not update active operations - {}", e);
+ }
+ }
+ }
+}
+
impl DataStore {
pub fn lookup_datastore(
name: &str,
@@ -70,12 +101,15 @@ impl DataStore {
let config: DataStoreConfig = config.lookup("datastore", name)?;
let path = PathBuf::from(&config.path);
- match (&config.maintenance_type, operation) {
- (Some(MaintenanceType::ReadOnly(message)), Some(Operation::Write))
- | (Some(MaintenanceType::Offline(message)), Some(_)) => {
- bail!("Datastore '{}' is in maintenance mode: {}", name, message);
- },
- _ => {}
+ if let Some(MaintenanceType::Offline(message)) = &config.maintenance_type {
+ bail!("datastore '{}' is in offline maintenance mode: {}", name, message);
+ } else if let Some(MaintenanceType::ReadOnly(message)) = &config.maintenance_type {
+ if let Some(Operation::Write) = operation {
+ bail!("datastore '{}' is in read-only maintenance mode: {}", name, message);
+ }
+ }
+ if let Some(op) = operation {
+ update_active_operations(name, op, 1)?;
}
let mut map = DATASTORE_MAP.lock().unwrap();
@@ -85,7 +119,10 @@ impl DataStore {
if datastore.chunk_store.base() == path &&
datastore.verify_new == config.verify_new.unwrap_or(false)
{
- return Ok(datastore.clone());
+ return Ok(Arc::new(Self {
+ inner: datastore.clone(),
+ operation,
+ }))
}
}
@@ -94,7 +131,10 @@ impl DataStore {
let datastore = Arc::new(datastore);
map.insert(name.to_string(), datastore.clone());
- Ok(datastore)
+ Ok(Arc::new(Self {
+ inner: datastore,
+ operation,
+ }))
}
/// removes all datastores that are not configured anymore
@@ -109,7 +149,7 @@ impl DataStore {
Ok(())
}
- fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
+ fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
let chunk_store = ChunkStore::open(store_name, path)?;
let mut gc_status_path = chunk_store.base_path();
@@ -127,7 +167,7 @@ impl DataStore {
GarbageCollectionStatus::default()
};
- Ok(Self {
+ Ok(DataStoreImpl {
chunk_store: Arc::new(chunk_store),
gc_mutex: Mutex::new(()),
last_gc_status: Mutex::new(gc_status),
@@ -141,19 +181,19 @@ impl DataStore {
impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
Error
> {
- self.chunk_store.get_chunk_iterator()
+ self.inner.chunk_store.get_chunk_iterator()
}
pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
- let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
+ let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
Ok(index)
}
pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
- let full_path = self.chunk_store.relative_path(filename.as_ref());
+ let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
let index = FixedIndexReader::open(&full_path)?;
@@ -165,14 +205,14 @@ impl DataStore {
) -> Result<DynamicIndexWriter, Error> {
let index = DynamicIndexWriter::create(
- self.chunk_store.clone(), filename.as_ref())?;
+ self.inner.chunk_store.clone(), filename.as_ref())?;
Ok(index)
}
pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
- let full_path = self.chunk_store.relative_path(filename.as_ref());
+ let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
let index = DynamicIndexReader::open(&full_path)?;
@@ -222,11 +262,11 @@ impl DataStore {
}
pub fn name(&self) -> &str {
- self.chunk_store.name()
+ self.inner.chunk_store.name()
}
pub fn base_path(&self) -> PathBuf {
- self.chunk_store.base_path()
+ self.inner.chunk_store.base_path()
}
/// Cleanup a backup directory
@@ -529,7 +569,7 @@ impl DataStore {
worker.check_abort()?;
worker.fail_on_shutdown()?;
let digest = index.index_digest(pos).unwrap();
- if !self.chunk_store.cond_touch_chunk(digest, false)? {
+ if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
task_warn!(
worker,
"warning: unable to access non-existent chunk {}, required by {:?}",
@@ -545,7 +585,7 @@ impl DataStore {
let mut bad_path = PathBuf::new();
bad_path.push(self.chunk_path(digest).0);
bad_path.set_extension(bad_ext);
- self.chunk_store.cond_touch_path(&bad_path, false)?;
+ self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
}
}
}
@@ -625,24 +665,24 @@ impl DataStore {
}
pub fn last_gc_status(&self) -> GarbageCollectionStatus {
- self.last_gc_status.lock().unwrap().clone()
+ self.inner.last_gc_status.lock().unwrap().clone()
}
pub fn garbage_collection_running(&self) -> bool {
- !matches!(self.gc_mutex.try_lock(), Ok(_))
+ !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
}
pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
- if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
+ if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
// avoids that we run GC if an old daemon process has still a
// running backup writer, which is not save as we have no "oldest
// writer" information and thus no safe atime cutoff
- let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
+ let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?;
let phase1_start_time = proxmox_time::epoch_i64();
- let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
+ let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
let mut gc_status = GarbageCollectionStatus::default();
gc_status.upid = Some(upid.to_string());
@@ -652,7 +692,7 @@ impl DataStore {
self.mark_used_chunks(&mut gc_status, worker)?;
task_log!(worker, "Start GC phase2 (sweep unused chunks)");
- self.chunk_store.sweep_unused_chunks(
+ self.inner.chunk_store.sweep_unused_chunks(
oldest_writer,
phase1_start_time,
&mut gc_status,
@@ -729,7 +769,7 @@ impl DataStore {
let _ = replace_file(path, serialized.as_bytes(), options, false);
}
- *self.last_gc_status.lock().unwrap() = gc_status;
+ *self.inner.last_gc_status.lock().unwrap() = gc_status;
} else {
bail!("Start GC failed - (already running/locked)");
@@ -739,15 +779,15 @@ impl DataStore {
}
pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
- self.chunk_store.try_shared_lock()
+ self.inner.chunk_store.try_shared_lock()
}
pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
- self.chunk_store.chunk_path(digest)
+ self.inner.chunk_store.chunk_path(digest)
}
pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
- self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
+ self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
}
pub fn insert_chunk(
@@ -755,7 +795,7 @@ impl DataStore {
chunk: &DataBlob,
digest: &[u8; 32],
) -> Result<(bool, u64), Error> {
- self.chunk_store.insert_chunk(chunk, digest)
+ self.inner.chunk_store.insert_chunk(chunk, digest)
}
pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
@@ -771,13 +811,13 @@ impl DataStore {
pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
- let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
+ let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
std::fs::metadata(chunk_path).map_err(Error::from)
}
pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
- let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
+ let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
proxmox_lang::try_block!({
let mut file = std::fs::File::open(&chunk_path)?;
@@ -891,7 +931,7 @@ impl DataStore {
}
pub fn verify_new(&self) -> bool {
- self.verify_new
+ self.inner.verify_new
}
/// returns a list of chunks sorted by their inode number on disk
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index d50a64a5..131040c6 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -145,6 +145,9 @@
// Note: .pcat1 => Proxmox Catalog Format version 1
pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
+/// Directory path where active operations counters are saved.
+pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
+
#[macro_export]
macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
() => {
@@ -179,6 +182,7 @@ pub mod paperkey;
pub mod prune;
pub mod read_chunk;
pub mod store_progress;
+pub mod task_tracking;
pub mod dynamic_index;
pub mod fixed_index;
diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs
new file mode 100644
index 00000000..a02d9a17
--- /dev/null
+++ b/pbs-datastore/src/task_tracking.rs
@@ -0,0 +1,81 @@
+use anyhow::Error;
+use libc::pid_t;
+use nix::unistd::Pid;
+use std::path::PathBuf;
+
+use pbs_api_types::Operation;
+use proxmox_sys::fs::{file_read_optional_string, open_file_locked, replace_file, CreateOptions};
+use proxmox_sys::linux::procfs;
+use serde::{Deserialize, Serialize};
+
+#[derive(Deserialize, Serialize, Clone)]
+struct TaskOperations {
+ pid: u32,
+ starttime: u64,
+ reading_operations: i64,
+ writing_operations: i64,
+}
+
+pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
+ let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
+ let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
+
+ let user = pbs_config::backup_user()?;
+ let options = CreateOptions::new()
+ .group(user.gid)
+ .owner(user.uid)
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let timeout = std::time::Duration::new(10, 0);
+ open_file_locked(&lock_path, timeout, true, options.clone())?;
+
+ let pid = std::process::id();
+ let starttime = procfs::PidStat::read_from_pid(Pid::from_raw(pid as pid_t))?.starttime;
+ let mut updated = false;
+
+ let mut updated_tasks: Vec<TaskOperations> = match file_read_optional_string(&path)? {
+ Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)?
+ .iter_mut()
+ .filter_map(
+ |task| match procfs::check_process_running(task.pid as pid_t) {
+ Some(stat) if pid == task.pid && stat.starttime != task.starttime => None,
+ Some(_) => {
+ if pid == task.pid {
+ updated = true;
+ match operation {
+ Operation::Read => task.reading_operations += count,
+ Operation::Write => task.writing_operations += count,
+ };
+ }
+ Some(task.clone())
+ }
+ _ => None,
+ },
+ )
+ .collect(),
+ None => Vec::new(),
+ };
+
+ if !updated {
+ updated_tasks.push(match operation {
+ Operation::Read => TaskOperations {
+ pid,
+ starttime,
+ reading_operations: 1,
+ writing_operations: 0,
+ },
+ Operation::Write => TaskOperations {
+ pid,
+ starttime,
+ reading_operations: 0,
+ writing_operations: 1,
+ },
+ })
+ }
+ replace_file(
+ &path,
+ serde_json::to_string(&updated_tasks)?.as_bytes(),
+ options,
+ false,
+ )
+}
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index ee037a3b..68aa4863 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -75,6 +75,7 @@ async fn run() -> Result<(), Error> {
proxmox_backup::server::create_run_dir()?;
proxmox_backup::server::create_state_dir()?;
+ proxmox_backup::server::create_active_operations_dir()?;
proxmox_backup::server::jobstate::create_jobstate_dir()?;
proxmox_backup::tape::create_tape_status_dir()?;
proxmox_backup::tape::create_drive_state_dir()?;
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 2b54cdf0..5a9884e9 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,7 +4,7 @@
//! services. We want async IO, so this is built on top of
//! tokio/hyper.
-use anyhow::Error;
+use anyhow::{format_err, Error};
use serde_json::Value;
use proxmox_sys::fs::{create_path, CreateOptions};
@@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
Ok(())
}
+
+/// Create active operations dir with correct permission.
+pub fn create_active_operations_dir() -> Result<(), Error> {
+ let backup_user = pbs_config::backup_user()?;
+ let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
+ let options = CreateOptions::new()
+ .perm(mode)
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
+ .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
+ Ok(())
+}
--
2.30.2
next prev parent reply other threads:[~2022-02-04 11:19 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-02-04 11:17 [pbs-devel] [PATCH proxmox-backup v7 0/6] closes #3071: maintenance mode for datastore Hannes Laimer
2022-02-04 11:17 ` [pbs-devel] [PATCH proxmox-backup v7 1/6] api-types: add maintenance type Hannes Laimer
2022-02-08 9:40 ` Wolfgang Bumiller
2022-02-04 11:17 ` [pbs-devel] [PATCH proxmox-backup v7 2/6] datastore: add check for maintenance in lookup Hannes Laimer
2022-02-08 9:43 ` Wolfgang Bumiller
2022-02-04 11:17 ` Hannes Laimer [this message]
2022-02-08 10:13 ` [pbs-devel] [PATCH proxmox-backup v7 3/6] pbs-datastore: add active operations tracking Wolfgang Bumiller
2022-02-04 11:17 ` [pbs-devel] [PATCH proxmox-backup v7 4/6] api: make maintenance_type updatable Hannes Laimer
2022-02-04 11:17 ` [pbs-devel] [PATCH proxmox-backup v7 5/6] api: add get_active_operations endpoint Hannes Laimer
2022-02-04 11:17 ` [pbs-devel] [PATCH proxmox-backup v7 6/6] ui: add option to change the maintenance type Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20220204111729.22107-4-h.laimer@proxmox.com \
--to=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox