From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id BF06E60F24 for ; Fri, 25 Sep 2020 16:14:06 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B55471C256 for ; Fri, 25 Sep 2020 16:13:36 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 4396E1C18C for ; Fri, 25 Sep 2020 16:13:30 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 0FFE4456A3 for ; Fri, 25 Sep 2020 16:13:30 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Fri, 25 Sep 2020 16:13:20 +0200 Message-Id: <20200925141327.25024-8-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200925141327.25024-1-d.csapak@proxmox.com> References: <20200925141327.25024-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.166 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_LAZY_DOMAIN_SECURITY 1 Sending domain does not have any anti-forgery methods NO_DNS_FOR_FROM 0.379 Envelope sender has no MX or A DNS records RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_NONE 0.001 SPF: sender does not publish an SPF Record Subject: [pbs-devel] [PATCH proxmox-backup 04/10] server/worker_task: split task list file into two X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 25 Sep 2020 14:14:06 -0000 one for only the active tasks and one for up to 1000 finished tasks factor out the parsing of a task file (we will later need this again) and use iterator combinators for easier code we now sort the tasks ascending (this will become important in a later patch) but reverse (for now) it to keep compatibility this code also omits the converting into an intermittent hash since it cannot really happen that we have duplicate tasks in this list (since the call is locked by an flock, and it is the only place where we write into the lists) Signed-off-by: Dominik Csapak --- src/server/worker_task.rs | 143 +++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 64 deletions(-) diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 3b47dfb6..9152609e 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -31,6 +31,9 @@ pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); +pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index"); + +const MAX_INDEX_TASKS: usize = 1000; lazy_static! { static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); @@ -347,76 +350,46 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E let lock = lock_task_list_files(true)?; - let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { - Ok(f) => Some(BufReader::new(f)), - Err(err) => { - if err.kind() == std::io::ErrorKind::NotFound { - None - } else { - bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); + let mut finish_list: Vec = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?; + let mut active_list: Vec = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)? + .into_iter() + .filter_map(|info| { + if info.state.is_some() { // should not happen? + finish_list.push(info); + return None; } - } - }; - let mut active_list = vec![]; - let mut finish_list = vec![]; - - if let Some(lines) = reader.map(|r| r.lines()) { - - for line in lines { - let line = line?; - match parse_worker_status_line(&line) { - Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), - Ok((upid_str, upid, state)) => match state { - None if worker_is_active_local(&upid) => { - active_list.push(TaskListInfo { upid, upid_str, state: None }); - }, - None => { - println!("Detected stopped UPID {}", upid_str); - let now = proxmox::tools::time::epoch_i64(); - let status = upid_read_status(&upid) - .unwrap_or_else(|_| TaskState::Unknown { endtime: now }); - finish_list.push(TaskListInfo { - upid, upid_str, state: Some(status) - }); - }, - Some(status) => { - finish_list.push(TaskListInfo { - upid, upid_str, state: Some(status) - }) - } - } + if !worker_is_active_local(&info.upid) { + println!("Detected stopped UPID {}", &info.upid_str); + let now = proxmox::tools::time::epoch_i64(); + let status = upid_read_status(&info.upid) + .unwrap_or_else(|_| TaskState::Unknown { endtime: now }); + finish_list.push(TaskListInfo { + upid: info.upid, + upid_str: info.upid_str, + state: Some(status) + }); + return None; } - } - } + + Some(info) + }).collect(); if let Some(upid) = new_upid { active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); } - // assemble list without duplicates - // we include all active tasks, - // and fill up to 1000 entries with finished tasks + let active_raw = render_task_list(&active_list); - let max = 1000; - - let mut task_hash = HashMap::new(); - - for info in active_list { - task_hash.insert(info.upid_str.clone(), info); - } - - for info in finish_list { - if task_hash.len() > max { break; } - if !task_hash.contains_key(&info.upid_str) { - task_hash.insert(info.upid_str.clone(), info); - } - } - - let mut task_list: Vec = vec![]; - for (_, info) in task_hash { task_list.push(info); } + replace_file( + PROXMOX_BACKUP_ACTIVE_TASK_FN, + active_raw.as_bytes(), + CreateOptions::new() + .owner(backup_user.uid) + .group(backup_user.gid), + )?; - task_list.sort_unstable_by(|b, a| { // lastest on top + finish_list.sort_unstable_by(|a, b| { match (&a.state, &b.state) { (Some(s1), Some(s2)) => s1.cmp(&s2), (Some(_), None) => std::cmp::Ordering::Less, @@ -425,11 +398,13 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E } }); - let raw = render_task_list(&task_list[..]); + let start = (finish_list.len()-MAX_INDEX_TASKS).max(0); + let end = (start+MAX_INDEX_TASKS).min(finish_list.len()); + let index_raw = render_task_list(&finish_list[start..end]); replace_file( - PROXMOX_BACKUP_ACTIVE_TASK_FN, - raw.as_bytes(), + PROXMOX_BACKUP_INDEX_TASK_FN, + index_raw.as_bytes(), CreateOptions::new() .owner(backup_user.uid) .group(backup_user.gid), @@ -437,7 +412,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result, E drop(lock); - Ok(task_list) + finish_list.append(&mut active_list); + finish_list.reverse(); + Ok(finish_list) } /// Returns a sorted list of known tasks @@ -467,6 +444,44 @@ fn render_task_list(list: &[TaskListInfo]) -> String { raw } +// note this is not locked, caller has to make sure it is +// this will skip (and log) lines that are not valid status lines +fn read_task_file(reader: R) -> Result, Error> +{ + let reader = BufReader::new(reader); + let mut list = Vec::new(); + for line in reader.lines() { + let line = line?; + match parse_worker_status_line(&line) { + Ok((upid_str, upid, state)) => list.push(TaskListInfo { + upid_str, + upid, + state + }), + Err(err) => { + eprintln!("unable to parse worker status '{}' - {}", line, err); + continue; + } + }; + } + + Ok(list) +} + +// note this is not locked, caller has to make sure it is +fn read_task_file_from_path

(path: P) -> Result, Error> +where + P: AsRef + std::fmt::Debug, +{ + let file = match File::open(&path) { + Ok(f) => f, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), + Err(err) => bail!("unable to open task list {:?} - {}", path, err), + }; + + read_task_file(file) +} + /// Launch long running worker tasks. /// /// A worker task can either be a whole thread, or a simply tokio -- 2.20.1