public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 04/10] server/worker_task: split task list file into two
Date: Fri, 25 Sep 2020 16:13:20 +0200	[thread overview]
Message-ID: <20200925141327.25024-8-d.csapak@proxmox.com> (raw)
In-Reply-To: <20200925141327.25024-1-d.csapak@proxmox.com>

one for only the active tasks and one for up to 1000 finished tasks

factor out the parsing of a task file (we will later need this again)
and use iterator combinators for easier code

we now sort the tasks ascending (this will become important in a later patch)
but reverse (for now) it to keep compatibility

this code also omits the converting into an intermittent hash
since it cannot really happen that we have duplicate tasks in this list
(since the call is locked by an flock, and it is the only place where we
write into the lists)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/server/worker_task.rs | 143 +++++++++++++++++++++-----------------
 1 file changed, 79 insertions(+), 64 deletions(-)

diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 3b47dfb6..9152609e 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -31,6 +31,9 @@ pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
 pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
 pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
 pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active");
+pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index");
+
+const MAX_INDEX_TASKS: usize = 1000;
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
@@ -347,76 +350,46 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
 
     let lock = lock_task_list_files(true)?;
 
-    let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) {
-        Ok(f) => Some(BufReader::new(f)),
-        Err(err) => {
-            if err.kind() ==  std::io::ErrorKind::NotFound {
-                 None
-            } else {
-                bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err);
+    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
+    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
+        .into_iter()
+        .filter_map(|info| {
+            if info.state.is_some() { // should not happen?
+                finish_list.push(info);
+                return None;
             }
-        }
-    };
 
-    let mut active_list = vec![];
-    let mut finish_list = vec![];
-
-    if let Some(lines) = reader.map(|r| r.lines()) {
-
-        for line in lines {
-            let line = line?;
-            match parse_worker_status_line(&line) {
-                Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err),
-                Ok((upid_str, upid, state)) => match state {
-                    None if worker_is_active_local(&upid) => {
-                        active_list.push(TaskListInfo { upid, upid_str, state: None });
-                    },
-                    None => {
-                        println!("Detected stopped UPID {}", upid_str);
-                        let now = proxmox::tools::time::epoch_i64();
-                        let status = upid_read_status(&upid)
-                            .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
-                        finish_list.push(TaskListInfo {
-                            upid, upid_str, state: Some(status)
-                        });
-                    },
-                    Some(status) => {
-                        finish_list.push(TaskListInfo {
-                            upid, upid_str, state: Some(status)
-                        })
-                    }
-                }
+            if !worker_is_active_local(&info.upid) {
+                println!("Detected stopped UPID {}", &info.upid_str);
+                let now = proxmox::tools::time::epoch_i64();
+                let status = upid_read_status(&info.upid)
+                    .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
+                finish_list.push(TaskListInfo {
+                    upid: info.upid,
+                    upid_str: info.upid_str,
+                    state: Some(status)
+                });
+                return None;
             }
-        }
-    }
+
+            Some(info)
+        }).collect();
 
     if let Some(upid) = new_upid {
         active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
     }
 
-    // assemble list without duplicates
-    // we include all active tasks,
-    // and fill up to 1000 entries with finished tasks
+    let active_raw = render_task_list(&active_list);
 
-    let max = 1000;
-
-    let mut task_hash = HashMap::new();
-
-    for info in active_list {
-        task_hash.insert(info.upid_str.clone(), info);
-    }
-
-    for info in finish_list {
-        if task_hash.len() > max { break; }
-        if !task_hash.contains_key(&info.upid_str) {
-            task_hash.insert(info.upid_str.clone(), info);
-        }
-    }
-
-    let mut task_list: Vec<TaskListInfo> = vec![];
-    for (_, info) in task_hash { task_list.push(info); }
+    replace_file(
+        PROXMOX_BACKUP_ACTIVE_TASK_FN,
+        active_raw.as_bytes(),
+        CreateOptions::new()
+            .owner(backup_user.uid)
+            .group(backup_user.gid),
+    )?;
 
-    task_list.sort_unstable_by(|b, a| { // lastest on top
+    finish_list.sort_unstable_by(|a, b| {
         match (&a.state, &b.state) {
             (Some(s1), Some(s2)) => s1.cmp(&s2),
             (Some(_), None) => std::cmp::Ordering::Less,
@@ -425,11 +398,13 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
         }
     });
 
-    let raw = render_task_list(&task_list[..]);
+    let start = (finish_list.len()-MAX_INDEX_TASKS).max(0);
+    let end = (start+MAX_INDEX_TASKS).min(finish_list.len());
+    let index_raw = render_task_list(&finish_list[start..end]);
 
     replace_file(
-        PROXMOX_BACKUP_ACTIVE_TASK_FN,
-        raw.as_bytes(),
+        PROXMOX_BACKUP_INDEX_TASK_FN,
+        index_raw.as_bytes(),
         CreateOptions::new()
             .owner(backup_user.uid)
             .group(backup_user.gid),
@@ -437,7 +412,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
 
     drop(lock);
 
-    Ok(task_list)
+    finish_list.append(&mut active_list);
+    finish_list.reverse();
+    Ok(finish_list)
 }
 
 /// Returns a sorted list of known tasks
@@ -467,6 +444,44 @@ fn render_task_list(list: &[TaskListInfo]) -> String {
     raw
 }
 
+// note this is not locked, caller has to make sure it is
+// this will skip (and log) lines that are not valid status lines
+fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
+{
+    let reader = BufReader::new(reader);
+    let mut list = Vec::new();
+    for line in reader.lines() {
+        let line = line?;
+        match parse_worker_status_line(&line) {
+            Ok((upid_str, upid, state)) => list.push(TaskListInfo {
+                upid_str,
+                upid,
+                state
+            }),
+            Err(err) => {
+                eprintln!("unable to parse worker status '{}' - {}", line, err);
+                continue;
+            }
+        };
+    }
+
+    Ok(list)
+}
+
+// note this is not locked, caller has to make sure it is
+fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
+where
+    P: AsRef<std::path::Path> + std::fmt::Debug,
+{
+    let file = match File::open(&path) {
+        Ok(f) => f,
+        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
+        Err(err) => bail!("unable to open task list {:?} - {}", path, err),
+    };
+
+    read_task_file(file)
+}
+
 /// Launch long running worker tasks.
 ///
 /// A worker task can either be a whole thread, or a simply tokio
-- 
2.20.1





  parent reply	other threads:[~2020-09-25 14:14 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-25 14:13 [pbs-devel] [PATCH proxmox/proxmox-backup/widget-toolkit] improve task list handling Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox 1/3] proxmox/tools/fs: add shared lock helper Dominik Csapak
2020-09-28  5:10   ` [pbs-devel] applied: " Dietmar Maurer
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox 2/3] proxmox/tools/fs: create tmpfile helper Dominik Csapak
2020-09-28  5:10   ` [pbs-devel] applied: " Dietmar Maurer
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox 3/3] proxmox/tools: add logrotate module Dominik Csapak
2020-09-28  5:12   ` Dietmar Maurer
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 01/10] api2/node/tasks: move userfilter to function signature Dominik Csapak
2020-09-28  5:18   ` [pbs-devel] applied: " Dietmar Maurer
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 02/10] server/worker_task: refactor locking of the task list Dominik Csapak
2020-09-28  5:28   ` Dietmar Maurer
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 03/10] server/worker_task: factor out task list rendering Dominik Csapak
2020-09-28  5:31   ` [pbs-devel] applied: " Dietmar Maurer
2020-09-25 14:13 ` Dominik Csapak [this message]
2020-09-28  5:43   ` [pbs-devel] [PATCH proxmox-backup 04/10] server/worker_task: split task list file into two Dietmar Maurer
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 05/10] server/worker_task: write older tasks into archive file Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 06/10] server/worker_task: add TaskListInfoIterator Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 07/10] api2/node/tasks: use TaskListInfoIterator instead of read_task_list Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 08/10] api2/status: use the TaskListInfoIterator here Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 09/10] server/worker_task: remove unecessary read_task_list Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH proxmox-backup 10/10] proxmox-backup-proxy: add task archive rotation Dominik Csapak
2020-09-25 14:13 ` [pbs-devel] [PATCH widget-toolkit 1/1] node/Tasks: improve scroller behaviour on datastore loading Dominik Csapak
2020-09-29  7:19   ` [pbs-devel] applied: " Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200925141327.25024-8-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal