all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>,
	Hannes Laimer <h.laimer@proxmox.com>
Subject: Re: [pbs-devel] [PATCH v4 proxmox-backup 3/6] pbs-datastore: add active operations tracking
Date: Fri, 19 Nov 2021 16:21:04 +0100	[thread overview]
Message-ID: <1e834eb8-f3f8-00e6-ca9b-37e0e86a003c@proxmox.com> (raw)
In-Reply-To: <20211112123019.15384-4-h.laimer@proxmox.com>

comments inline

On 11/12/21 13:30, Hannes Laimer wrote:
> Saves the currently active read/write operation counts in a file. The
> file is updated whenever a reference returned by lookup_datastore is
> dropped and whenever a reference is returned by lookup_datastore. The
> files are locked before every access, there is one file per datastore.
> ---
> v3->v4: implement Clone for the DataStore wrapper struct. So clones are
> tracked correctly. On every update of the file, the lines of inactive
> processes are removed. A line looks like this:
> <pid> <read> <write>
> So, PID has <read>/<write> active read/write operations. This is needed
> because if we just had one number for read/write and some process would
> die, we are not able to find out how many operations were ended with
> it. Like this we only consider lines that have a PID that belongs to an
> active process.
> 
>   pbs-api-types/src/maintenance.rs |   1 +
>   pbs-datastore/Cargo.toml         |   1 +
>   pbs-datastore/src/datastore.rs   | 166 +++++++++++++++++++++++++------
>   pbs-datastore/src/lib.rs         |   3 +
>   src/bin/proxmox-backup-api.rs    |   1 +
>   src/server/mod.rs                |  16 ++-
>   6 files changed, 158 insertions(+), 30 deletions(-)
> 
> diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
> index f816b279..98e3ec62 100644
> --- a/pbs-api-types/src/maintenance.rs
> +++ b/pbs-api-types/src/maintenance.rs
> @@ -25,6 +25,7 @@ pub enum MaintenanceType {
>       Offline(String),
>   }
>   
> +#[derive(Copy ,Clone)]
>   /// Operation requirments, used when checking for maintenance mode.
>   pub enum Operation {
>       Read,
> diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
> index 01c5ee00..2063c1bb 100644
> --- a/pbs-datastore/Cargo.toml
> +++ b/pbs-datastore/Cargo.toml
> @@ -34,5 +34,6 @@ proxmox-time = "1"
>   proxmox-uuid = "1"
>   
>   pbs-api-types = { path = "../pbs-api-types" }
> +pbs-buildcfg = { path = "../pbs-buildcfg" }
>   pbs-tools = { path = "../pbs-tools" }
>   pbs-config = { path = "../pbs-config" }
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index 064fd273..d8c2f3f5 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -9,6 +9,7 @@ use std::time::Duration;
>   use anyhow::{bail, format_err, Error};
>   use lazy_static::lazy_static;
>   
> +use proxmox::sys::linux::procfs;
>   use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions};
>   
>   use pbs_api_types::{UPID, DataStoreConfig, Authid, GarbageCollectionStatus, MaintenanceType, Operation};
> @@ -31,7 +32,7 @@ use crate::manifest::{
>   };
>   
>   lazy_static! {
> -    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
> +    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
>   }
>   
>   /// checks if auth_id is owner, or, if owner is a token, if
> @@ -52,13 +53,113 @@ pub fn check_backup_owner(
>   ///
>   /// A Datastore can store severals backups, and provides the
>   /// management interface for backup.
> -pub struct DataStore {
> +pub struct DataStoreImpl {
>       chunk_store: Arc<ChunkStore>,
>       gc_mutex: Mutex<()>,
>       last_gc_status: Mutex<GarbageCollectionStatus>,
>       verify_new: bool,
>   }
>   
> +pub struct DataStore {
> +    inner: Arc<DataStoreImpl>,
> +    operation: Option<Operation>,
> +}
> +
> +impl Clone for DataStore {
> +    fn clone(&self) -> Self {
> +        if let Some(operation) = self.operation.clone() {
> +            if let Err(e) = update_active_operations(self.name(), operation, -1) {
> +                eprintln!("could not update active operations - {}", e);
> +            }
> +        }
> +        DataStore {
> +            inner: self.inner.clone(),
> +            operation: self.operation.clone(),
> +        }
> +    }
> +}
> +
> +fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
> +    let mut path = PathBuf::from(crate::ACTIVE_OPERATIONS_DIR);
> +    let mut lock_path = PathBuf::from(crate::ACTIVE_OPERATIONS_DIR);
> +    path.push(name);
> +    lock_path.push(format!(".{}.lock", name));
> +
> +    let user = pbs_config::backup_user()?;
> +    let options = proxmox::tools::fs::CreateOptions::new()
> +        .group(user.gid)
> +        .owner(user.uid)
> +        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +    let timeout = std::time::Duration::new(10, 0);
> +    proxmox::tools::fs::open_file_locked(&lock_path, timeout, true, options.clone())?;
> +
> +    let pid = std::process::id() as i64;
> +    proxmox::tools::fs::replace_file(
> +        &path,
> +        match proxmox::tools::fs::file_get_optional_contents(&path) {

you could do that outside of the replace_file call
removes a level of indentation

> +            Ok(Some(data)) => {
> +                let mut updated = false;
> +                let new_data = String::from_utf8(data)

we already have: file_read_optional_string
so no need to convert it ourselves

removes of at least one level of intendation

> +                        .unwrap_or(String::new())
> +                        .lines()
> +                        .into_iter()
> +                        .fold(String::new(), |xs, line| {

any special reason to use fold with the permanent allocation (format!())
instead of simply using a loop and appending to a string?

> +                            let split = line.split(" ").collect::<Vec<&str>>();
> +                            match split[0].parse::<i32>() {
> +                                Ok(line_pid) if line_pid as i64 == pid && split.len() == 3 => {
> +                                    let stat = (
> +                                        split[1].parse::<i64>().unwrap_or(0),
> +                                        split[2].parse::<i64>().unwrap_or(0),
> +                                    );
> +                                    updated = true;
> +                                    match (operation, stat) {
> +                                        (Operation::Write, (r, w)) => {
> +                                            format!("{}\n{} {} {}", xs, pid, r, w + count)
> +                                        }
> +                                        (Operation::Read, (r, w)) => {
> +                                            format!("{}\n{} {} {}", xs, pid, r + count, w)
> +                                        }
> +                                    }
> +                                }
> +                                Ok(line_pid)
> +                                    if procfs::check_process_running(line_pid).is_some()
> +                                        && split.len() == 3 =>
> +                                {
> +                                    format!("{}\n{}", xs, line)
> +                                }
> +                                _ => xs,

imho, the line parsing/printing should be their own function, that
way one can more easily reason about the behaviour...

> +                            }
> +                        });
> +                match operation {
> +                    Operation::Read if !updated => format!("{}\n{} {} {}", new_data, pid, 1, 0),
> +                    Operation::Write if !updated => format!("{}\n{} {} {}", new_data, pid, 0, 1),
> +                    _ => new_data,
> +                }
> +            }
> +            _ => match operation {
> +                Operation::Read => format!("{} {} {}", pid, 1, 0),
> +                Operation::Write => format!("{} {} {}", pid, 0, 1),
> +            },
> +        }
> +        .as_bytes(),
> +        options,
> +        false,
> +    )?;
> +
> +    Ok(())
> +}
> +
> +impl Drop for DataStore {
> +    fn drop(&mut self) {
> +        if let Some(operation) = self.operation.clone() {
> +            if let Err(e) = update_active_operations(self.name(), operation, -1) {
> +                eprintln!("could not update active operations - {}", e);
> +            }
> +        }
> +    }
> +}
> +
>   impl DataStore {
>       pub fn lookup_datastore(
>           name: &str,
> @@ -73,6 +174,7 @@ impl DataStore {
>               | (Some(MaintenanceType::Offline(message)), Some(_)) => {
>                   bail!("Datastore '{}' is in maintenance mode: {}", name, message);
>               },
> +            (_, Some(operation)) => update_active_operations(name, operation, 1)?,
>               _ => {}
>           }
>   
> @@ -83,7 +185,10 @@ impl DataStore {
>               if datastore.chunk_store.base() == path &&
>                   datastore.verify_new == config.verify_new.unwrap_or(false)
>               {
> -                return Ok(datastore.clone());
> +                return Ok(Arc::new(Self {
> +                    inner: datastore.clone(),
> +                    operation,
> +                }))
>               }
>           }
>   
> @@ -92,7 +197,10 @@ impl DataStore {
>           let datastore = Arc::new(datastore);
>           map.insert(name.to_string(), datastore.clone());
>   
> -        Ok(datastore)
> +        Ok(Arc::new(Self {
> +            inner: datastore,
> +            operation,
> +        }))
>       }
>   
>       /// removes all datastores that are not configured anymore
> @@ -107,7 +215,7 @@ impl DataStore {
>           Ok(())
>       }
>   
> -    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
> +    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
>           let chunk_store = ChunkStore::open(store_name, path)?;
>   
>           let mut gc_status_path = chunk_store.base_path();
> @@ -125,7 +233,7 @@ impl DataStore {
>               GarbageCollectionStatus::default()
>           };
>   
> -        Ok(Self {
> +        Ok(DataStoreImpl {
>               chunk_store: Arc::new(chunk_store),
>               gc_mutex: Mutex::new(()),
>               last_gc_status: Mutex::new(gc_status),
> @@ -139,19 +247,19 @@ impl DataStore {
>           impl Iterator<Item = (Result<pbs_tools::fs::ReadDirEntry, Error>, usize, bool)>,
>           Error
>       > {
> -        self.chunk_store.get_chunk_iterator()
> +        self.inner.chunk_store.get_chunk_iterator()
>       }
>   
>       pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
>   
> -        let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
> +        let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
>   
>           Ok(index)
>       }
>   
>       pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
>   
> -        let full_path =  self.chunk_store.relative_path(filename.as_ref());
> +        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
>   
>           let index = FixedIndexReader::open(&full_path)?;
>   
> @@ -163,14 +271,14 @@ impl DataStore {
>       ) -> Result<DynamicIndexWriter, Error> {
>   
>           let index = DynamicIndexWriter::create(
> -            self.chunk_store.clone(), filename.as_ref())?;
> +            self.inner.chunk_store.clone(), filename.as_ref())?;
>   
>           Ok(index)
>       }
>   
>       pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
>   
> -        let full_path =  self.chunk_store.relative_path(filename.as_ref());
> +        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
>   
>           let index = DynamicIndexReader::open(&full_path)?;
>   
> @@ -220,11 +328,11 @@ impl DataStore {
>       }
>   
>       pub fn name(&self) -> &str {
> -        self.chunk_store.name()
> +        self.inner.chunk_store.name()
>       }
>   
>       pub fn base_path(&self) -> PathBuf {
> -        self.chunk_store.base_path()
> +        self.inner.chunk_store.base_path()
>       }
>   
>       /// Cleanup a backup directory
> @@ -530,7 +638,7 @@ impl DataStore {
>               worker.check_abort()?;
>               worker.fail_on_shutdown()?;
>               let digest = index.index_digest(pos).unwrap();
> -            if !self.chunk_store.cond_touch_chunk(digest, false)? {
> +            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
>                   task_warn!(
>                       worker,
>                       "warning: unable to access non-existent chunk {}, required by {:?}",
> @@ -546,7 +654,7 @@ impl DataStore {
>                       let mut bad_path = PathBuf::new();
>                       bad_path.push(self.chunk_path(digest).0);
>                       bad_path.set_extension(bad_ext);
> -                    self.chunk_store.cond_touch_path(&bad_path, false)?;
> +                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
>                   }
>               }
>           }
> @@ -626,24 +734,24 @@ impl DataStore {
>       }
>   
>       pub fn last_gc_status(&self) -> GarbageCollectionStatus {
> -        self.last_gc_status.lock().unwrap().clone()
> +        self.inner.last_gc_status.lock().unwrap().clone()
>       }
>   
>       pub fn garbage_collection_running(&self) -> bool {
> -        !matches!(self.gc_mutex.try_lock(), Ok(_))
> +        !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
>       }
>   
>       pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
>   
> -        if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
> +        if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
>   
>               // avoids that we run GC if an old daemon process has still a
>               // running backup writer, which is not save as we have no "oldest
>               // writer" information and thus no safe atime cutoff
> -            let _exclusive_lock =  self.chunk_store.try_exclusive_lock()?;
> +            let _exclusive_lock =  self.inner.chunk_store.try_exclusive_lock()?;
>   
>               let phase1_start_time = proxmox_time::epoch_i64();
> -            let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
> +            let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
>   
>               let mut gc_status = GarbageCollectionStatus::default();
>               gc_status.upid = Some(upid.to_string());
> @@ -653,7 +761,7 @@ impl DataStore {
>               self.mark_used_chunks(&mut gc_status, worker)?;
>   
>               task_log!(worker, "Start GC phase2 (sweep unused chunks)");
> -            self.chunk_store.sweep_unused_chunks(
> +            self.inner.chunk_store.sweep_unused_chunks(
>                   oldest_writer,
>                   phase1_start_time,
>                   &mut gc_status,
> @@ -730,7 +838,7 @@ impl DataStore {
>                   let _ = replace_file(path, serialized.as_bytes(), options, false);
>               }
>   
> -            *self.last_gc_status.lock().unwrap() = gc_status;
> +            *self.inner.last_gc_status.lock().unwrap() = gc_status;
>   
>           } else {
>               bail!("Start GC failed - (already running/locked)");
> @@ -740,15 +848,15 @@ impl DataStore {
>       }
>   
>       pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
> -        self.chunk_store.try_shared_lock()
> +        self.inner.chunk_store.try_shared_lock()
>       }
>   
>       pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
> -        self.chunk_store.chunk_path(digest)
> +        self.inner.chunk_store.chunk_path(digest)
>       }
>   
>       pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
> -        self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
> +        self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
>       }
>   
>       pub fn insert_chunk(
> @@ -756,7 +864,7 @@ impl DataStore {
>           chunk: &DataBlob,
>           digest: &[u8; 32],
>       ) -> Result<(bool, u64), Error> {
> -        self.chunk_store.insert_chunk(chunk, digest)
> +        self.inner.chunk_store.insert_chunk(chunk, digest)
>       }
>   
>       pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
> @@ -772,13 +880,13 @@ impl DataStore {
>   
>   
>       pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
> -        let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
> +        let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
>           std::fs::metadata(chunk_path).map_err(Error::from)
>       }
>   
>       pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
>   
> -        let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
> +        let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
>   
>           proxmox_lang::try_block!({
>               let mut file = std::fs::File::open(&chunk_path)?;
> @@ -892,7 +1000,7 @@ impl DataStore {
>       }
>   
>       pub fn verify_new(&self) -> bool {
> -        self.verify_new
> +        self.inner.verify_new
>       }
>   
>       /// returns a list of chunks sorted by their inode number on disk
> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
> index d50a64a5..159642fd 100644
> --- a/pbs-datastore/src/lib.rs
> +++ b/pbs-datastore/src/lib.rs
> @@ -145,6 +145,9 @@
>   // Note: .pcat1 => Proxmox Catalog Format version 1
>   pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
>   
> +/// Directory path where active operations counters are saved.
> +pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
> +
>   #[macro_export]
>   macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
>       () => {
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 9eb20269..aa60d316 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
>   
>       proxmox_backup::server::create_run_dir()?;
>       proxmox_backup::server::create_state_dir()?;
> +    proxmox_backup::server::create_active_operations_dir()?;
>       proxmox_backup::server::jobstate::create_jobstate_dir()?;
>       proxmox_backup::tape::create_tape_status_dir()?;
>       proxmox_backup::tape::create_drive_state_dir()?;
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index deeb3398..ffb26f0e 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -4,7 +4,7 @@
>   //! services. We want async IO, so this is built on top of
>   //! tokio/hyper.
>   
> -use anyhow::Error;
> +use anyhow::{format_err, Error};
>   use serde_json::Value;
>   
>   use proxmox::tools::fs::{create_path, CreateOptions};
> @@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
>       create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
>       Ok(())
>   }
> +
> +/// Create active operations dir with correct permission.
> +pub fn create_active_operations_dir() -> Result<(), Error> {
> +    let backup_user = pbs_config::backup_user()?;
> +    let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
> +    let options = CreateOptions::new()
> +        .perm(mode)
> +        .owner(backup_user.uid)
> +        .group(backup_user.gid);
> +
> +    create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
> +        .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
> +    Ok(())
> +}
> 





  reply	other threads:[~2021-11-19 15:21 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-12 12:30 [pbs-devel] [PATCH v4 proxmox-backup 0/6] closes #3071: maintenance mode for datastore Hannes Laimer
2021-11-12 12:30 ` [pbs-devel] [PATCH v4 proxmox-backup 1/6] pbs-api-types: add maintenance type Hannes Laimer
2021-11-12 12:30 ` [pbs-devel] [PATCH v4 proxmox-backup 2/6] pbs-datastore: add check for maintenance in lookup Hannes Laimer
2021-11-12 12:30 ` [pbs-devel] [PATCH v4 proxmox-backup 3/6] pbs-datastore: add active operations tracking Hannes Laimer
2021-11-19 15:21   ` Dominik Csapak [this message]
2021-11-12 12:30 ` [pbs-devel] [PATCH v4 proxmox-backup 4/6] api2: make maintenance_type updatable Hannes Laimer
2021-11-12 12:30 ` [pbs-devel] [PATCH v4 proxmox-backup 5/6] api2: add get_active_operations endpoint Hannes Laimer
2021-11-12 12:30 ` [pbs-devel] [PATCH v4 proxmox-backup 6/6] ui: add option to change the maintenance type Hannes Laimer
2021-11-22  7:28 ` [pbs-devel] [PATCH v4 proxmox-backup 0/6] closes #3071: maintenance mode for datastore Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1e834eb8-f3f8-00e6-ca9b-37e0e86a003c@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal