From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id B7F546C6AB for ; Thu, 23 Sep 2021 12:13:41 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B369223E87 for ; Thu, 23 Sep 2021 12:13:41 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 587F523E37 for ; Thu, 23 Sep 2021 12:13:35 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 2FA3244A81; Thu, 23 Sep 2021 12:13:35 +0200 (CEST) From: Dietmar Maurer To: pbs-devel@lists.proxmox.com Date: Thu, 23 Sep 2021 12:13:25 +0200 Message-Id: <20210923101329.950146-2-dietmar@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210923101329.950146-1-dietmar@proxmox.com> References: <20210923101329.950146-1-dietmar@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.430 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_2 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_4 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 23 Sep 2021 10:13:41 -0000 And application now needs to call init_worker_tasks() before using worker tasks. Notable changes: - need to call init_worker_tasks() before using worker tasks. - create_task_log_dirs() ís called inside init_worker_tasks() - removed UpidExt trait - use atomic_open_or_create_file() - remove pbs_config and pbs_buildcfg dependency --- src/api2/node/tasks.rs | 6 +- src/bin/proxmox-backup-api.rs | 7 +- src/bin/proxmox-backup-proxy.rs | 5 +- src/server/mod.rs | 3 - src/server/upid.rs | 18 -- src/server/worker_task.rs | 475 +++++++++++++++++++------------- 6 files changed, 290 insertions(+), 224 deletions(-) delete mode 100644 src/server/upid.rs diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs index 169a3d4d..df4673a1 100644 --- a/src/api2/node/tasks.rs +++ b/src/api2/node/tasks.rs @@ -16,7 +16,7 @@ use pbs_api_types::{ }; use crate::api2::pull::check_pull_privs; -use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator}; +use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator}; use pbs_config::CachedUserInfo; // matches respective job execution privileges @@ -220,7 +220,7 @@ async fn get_task_status( if crate::server::worker_is_active(&upid).await? { result["status"] = Value::from("running"); } else { - let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 }); + let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 }); result["status"] = Value::from("stopped"); result["exitstatus"] = Value::from(exitstatus.to_string()); }; @@ -287,7 +287,7 @@ async fn read_task_log( let mut count: u64 = 0; - let path = upid.log_path(); + let path = upid_log_path(&upid)?; let file = File::open(path)?; diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index 9ad10260..9901b85d 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> { bail!("unable to inititialize syslog - {}", err); } - server::create_task_log_dirs()?; - config::create_configdir()?; config::update_self_signed_cert(false)?; @@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> { config.enable_auth_log( pbs_buildcfg::API_AUTH_LOG_FN, - Some(dir_opts), - Some(file_opts), + Some(dir_opts.clone()), + Some(file_opts.clone()), &mut commando_sock, )?; let rest_server = RestServer::new(config); + proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?; // http server future: let server = daemon::create_daemon( diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 518054bf..5d8ed189 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> { config.enable_auth_log( pbs_buildcfg::API_AUTH_LOG_FN, - Some(dir_opts), - Some(file_opts), + Some(dir_opts.clone()), + Some(file_opts.clone()), &mut commando_sock, )?; let rest_server = RestServer::new(config); + proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?; //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes diff --git a/src/server/mod.rs b/src/server/mod.rs index a7dcee67..77320da6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String { ctrl_sock_from_pid(*PID) } -mod upid; -pub use upid::*; - mod worker_task; pub use worker_task::*; diff --git a/src/server/upid.rs b/src/server/upid.rs deleted file mode 100644 index 70a3e3fb..00000000 --- a/src/server/upid.rs +++ /dev/null @@ -1,18 +0,0 @@ -pub trait UPIDExt: private::Sealed { - /// Returns the absolute path to the task log file - fn log_path(&self) -> std::path::PathBuf; -} - -mod private { - pub trait Sealed {} - impl Sealed for pbs_api_types::UPID {} -} - -impl UPIDExt for pbs_api_types::UPID { - fn log_path(&self) -> std::path::PathBuf { - let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR); - path.push(format!("{:02X}", self.pstart % 256)); - path.push(self.to_string()); - path - } -} diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 92ab50d7..191d8a44 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, VecDeque}; use std::fs::File; +use std::path::PathBuf; use std::io::{Read, Write, BufRead, BufReader}; use std::panic::UnwindSafe; use std::sync::atomic::{AtomicBool, Ordering}; @@ -11,27 +12,267 @@ use lazy_static::lazy_static; use serde_json::{json, Value}; use serde::{Serialize, Deserialize}; use tokio::sync::oneshot; +use nix::fcntl::OFlag; +use once_cell::sync::OnceCell; use proxmox::sys::linux::procfs; use proxmox::try_block; -use proxmox::tools::fs::{create_path, replace_file, CreateOptions}; +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}; -use pbs_buildcfg; use pbs_tools::logrotate::{LogRotate, LogRotateFiles}; use pbs_api_types::UPID; -use pbs_config::{open_backup_lockfile, BackupLockGuard}; use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions}; -use super::UPIDExt; +struct TaskListLockGuard(File); -macro_rules! taskdir { - ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir)) +struct WorkerTaskSetup { + file_opts: CreateOptions, + taskdir: PathBuf, + task_lock_fn: PathBuf, + active_tasks_fn: PathBuf, + task_index_fn: PathBuf, + task_archive_fn: PathBuf, +} + +static WORKER_TASK_SETUP: OnceCell = OnceCell::new(); + +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { + WORKER_TASK_SETUP.get() + .ok_or_else(|| format_err!("WorkerTask library is not initialized")) +} + +impl WorkerTaskSetup { + + fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { + + let mut taskdir = basedir.clone(); + taskdir.push("tasks"); + + let mut task_lock_fn = taskdir.clone(); + task_lock_fn.push(".active.lock"); + + let mut active_tasks_fn = taskdir.clone(); + active_tasks_fn.push("active"); + + let mut task_index_fn = taskdir.clone(); + task_index_fn.push("index"); + + let mut task_archive_fn = taskdir.clone(); + task_archive_fn.push("archive"); + + Self { + file_opts, + taskdir, + task_lock_fn, + active_tasks_fn, + task_index_fn, + task_archive_fn, + } + } + + fn lock_task_list_files(&self, exclusive: bool) -> Result { + let options = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let timeout = std::time::Duration::new(10, 0); + + let file = proxmox::tools::fs::open_file_locked( + &self.task_lock_fn, + timeout, + exclusive, + options, + )?; + + Ok(TaskListLockGuard(file)) + } + + fn log_path(&self, upid: &UPID) -> std::path::PathBuf { + let mut path = self.taskdir.clone(); + path.push(format!("{:02X}", upid.pstart % 256)); + path.push(upid.to_string()); + path + } + + // atomically read/update the task list, update status of finished tasks + // new_upid is added to the list when specified. + fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { + + let lock = self.lock_task_list_files(true)?; + + // TODO remove with 1.x + let mut finish_list: Vec = read_task_file_from_path(&self.task_index_fn)?; + let had_index_file = !finish_list.is_empty(); + + // We use filter_map because one negative case wants to *move* the data into `finish_list`, + // clippy doesn't quite catch this! + #[allow(clippy::unnecessary_filter_map)] + let mut active_list: Vec = read_task_file_from_path(&self.active_tasks_fn)? + .into_iter() + .filter_map(|info| { + if info.state.is_some() { + // this can happen when the active file still includes finished tasks + finish_list.push(info); + return None; + } + + if !worker_is_active_local(&info.upid) { + // println!("Detected stopped task '{}'", &info.upid_str); + let now = proxmox::tools::time::epoch_i64(); + let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); + finish_list.push(TaskListInfo { + upid: info.upid, + upid_str: info.upid_str, + state: Some(status) + }); + return None; + } + + Some(info) + }).collect(); + + if let Some(upid) = new_upid { + active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); + } + + let active_raw = render_task_list(&active_list); + + let options = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + replace_file( + &self.active_tasks_fn, + active_raw.as_bytes(), + options, + )?; + + finish_list.sort_unstable_by(|a, b| { + match (&a.state, &b.state) { + (Some(s1), Some(s2)) => s1.cmp(&s2), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + _ => a.upid.starttime.cmp(&b.upid.starttime), + } + }); + + if !finish_list.is_empty() { + let options = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let mut writer = atomic_open_or_create_file( + &self.task_archive_fn, + OFlag::O_APPEND | OFlag::O_RDWR, + &[], + options, + )?; + for info in &finish_list { + writer.write_all(render_task_line(&info).as_bytes())?; + } + } + + // TODO Remove with 1.x + // for compatibility, if we had an INDEX file, we do not need it anymore + if had_index_file { + let _ = nix::unistd::unlink(&self.task_index_fn); + } + + drop(lock); + + Ok(()) + } + + // Create task log directory with correct permissions + fn create_task_log_dirs(&self) -> Result<(), Error> { + + try_block!({ + let dir_opts = self.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); + + create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?; + // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; + Ok(()) + }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) + } +} + +/// Initialize the WorkerTask library +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { + let setup = WorkerTaskSetup::new(basedir, file_opts); + setup.create_task_log_dirs()?; + WORKER_TASK_SETUP.set(setup) + .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) +} + +/// checks if the Task Archive is bigger that 'size_threshold' bytes, and +/// rotates it if it is +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option) -> Result { + + let setup = worker_task_setup()?; + + let _lock = setup.lock_task_list_files(true)?; + + let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress) + .ok_or_else(|| format_err!("could not get archive file names"))?; + + logrotate.rotate(size_threshold, None, max_files) +} + + +/// Path to the worker log file +pub fn upid_log_path(upid: &UPID) -> Result { + let setup = worker_task_setup()?; + Ok(setup.log_path(upid)) +} + +/// Read endtime (time of last log line) and exitstatus from task log file +/// If there is not a single line with at valid datetime, we assume the +/// starttime to be the endtime +pub fn upid_read_status(upid: &UPID) -> Result { + + let setup = worker_task_setup()?; + + let mut status = TaskState::Unknown { endtime: upid.starttime }; + + let path = setup.log_path(upid); + + let mut file = File::open(path)?; + + /// speedup - only read tail + use std::io::Seek; + use std::io::SeekFrom; + let _ = file.seek(SeekFrom::End(-8192)); // ignore errors + + let mut data = Vec::with_capacity(8192); + file.read_to_end(&mut data)?; + + // strip newlines at the end of the task logs + while data.last() == Some(&b'\n') { + data.pop(); + } + + let last_line = match data.iter().rposition(|c| *c == b'\n') { + Some(start) if data.len() > (start+1) => &data[start+1..], + Some(_) => &data, // should not happen, since we removed all trailing newlines + None => &data, + }; + + let last_line = std::str::from_utf8(last_line) + .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; + + let mut iter = last_line.splitn(2, ": "); + if let Some(time_str) = iter.next() { + if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { + // set the endtime even if we cannot parse the state + status = TaskState::Unknown { endtime }; + if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { + if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { + status = state; + } + } + } + } + + Ok(status) } -pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/"); -pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock"); -pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active"); -pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index"); -pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive"); lazy_static! { static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); @@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option Result<(), Error> { - - try_block!({ - let backup_user = pbs_config::backup_user()?; - let opts = CreateOptions::new() - .owner(backup_user.uid) - .group(backup_user.gid); - - create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; - create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; - create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; - Ok(()) - }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; - - Ok(()) -} - -/// Read endtime (time of last log line) and exitstatus from task log file -/// If there is not a single line with at valid datetime, we assume the -/// starttime to be the endtime -pub fn upid_read_status(upid: &UPID) -> Result { - - let mut status = TaskState::Unknown { endtime: upid.starttime }; - - let path = upid.log_path(); - - let mut file = File::open(path)?; - - /// speedup - only read tail - use std::io::Seek; - use std::io::SeekFrom; - let _ = file.seek(SeekFrom::End(-8192)); // ignore errors - - let mut data = Vec::with_capacity(8192); - file.read_to_end(&mut data)?; - - // strip newlines at the end of the task logs - while data.last() == Some(&b'\n') { - data.pop(); - } - - let last_line = match data.iter().rposition(|c| *c == b'\n') { - Some(start) if data.len() > (start+1) => &data[start+1..], - Some(_) => &data, // should not happen, since we removed all trailing newlines - None => &data, - }; - - let last_line = std::str::from_utf8(last_line) - .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; - - let mut iter = last_line.splitn(2, ": "); - if let Some(time_str) = iter.next() { - if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { - // set the endtime even if we cannot parse the state - status = TaskState::Unknown { endtime }; - if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { - if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { - status = state; - } - } - } - } - - Ok(status) -} - /// Task State #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum TaskState { @@ -323,107 +497,6 @@ impl Into for TaskListInfo { } } -fn lock_task_list_files(exclusive: bool) -> Result { - open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive) -} - -/// checks if the Task Archive is bigger that 'size_threshold' bytes, and -/// rotates it if it is -pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option) -> Result { - let _lock = lock_task_list_files(true)?; - - let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress) - .ok_or_else(|| format_err!("could not get archive file names"))?; - - logrotate.rotate(size_threshold, None, max_files) -} - -// atomically read/update the task list, update status of finished tasks -// new_upid is added to the list when specified. -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { - - let backup_user = pbs_config::backup_user()?; - - let lock = lock_task_list_files(true)?; - - // TODO remove with 1.x - let mut finish_list: Vec = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?; - let had_index_file = !finish_list.is_empty(); - - // We use filter_map because one negative case wants to *move* the data into `finish_list`, - // clippy doesn't quite catch this! - #[allow(clippy::unnecessary_filter_map)] - let mut active_list: Vec = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)? - .into_iter() - .filter_map(|info| { - if info.state.is_some() { - // this can happen when the active file still includes finished tasks - finish_list.push(info); - return None; - } - - if !worker_is_active_local(&info.upid) { - // println!("Detected stopped task '{}'", &info.upid_str); - let now = proxmox::tools::time::epoch_i64(); - let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); - finish_list.push(TaskListInfo { - upid: info.upid, - upid_str: info.upid_str, - state: Some(status) - }); - return None; - } - - Some(info) - }).collect(); - - if let Some(upid) = new_upid { - active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); - } - - let active_raw = render_task_list(&active_list); - - replace_file( - PROXMOX_BACKUP_ACTIVE_TASK_FN, - active_raw.as_bytes(), - CreateOptions::new() - .owner(backup_user.uid) - .group(backup_user.gid), - )?; - - finish_list.sort_unstable_by(|a, b| { - match (&a.state, &b.state) { - (Some(s1), Some(s2)) => s1.cmp(&s2), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - _ => a.upid.starttime.cmp(&b.upid.starttime), - } - }); - - if !finish_list.is_empty() { - match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) { - Ok(mut writer) => { - for info in &finish_list { - writer.write_all(render_task_line(&info).as_bytes())?; - } - }, - Err(err) => bail!("could not write task archive - {}", err), - } - - nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?; - } - - // TODO Remove with 1.x - // for compatibility, if we had an INDEX file, we do not need it anymore - if had_index_file { - let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN); - } - - drop(lock); - - Ok(()) -} - fn render_task_line(info: &TaskListInfo) -> String { let mut raw = String::new(); if let Some(status) = &info.state { @@ -486,27 +559,30 @@ pub struct TaskListInfoIterator { list: VecDeque, end: bool, archive: Option, - lock: Option, + lock: Option, } impl TaskListInfoIterator { pub fn new(active_only: bool) -> Result { + + let setup = worker_task_setup()?; + let (read_lock, active_list) = { - let lock = lock_task_list_files(false)?; - let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?; + let lock = setup.lock_task_list_files(false)?; + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; let needs_update = active_list .iter() .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); // TODO remove with 1.x - let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file(); + let index_exists = setup.task_index_fn.is_file(); if needs_update || index_exists { drop(lock); - update_active_workers(None)?; - let lock = lock_task_list_files(false)?; - let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?; + setup.update_active_workers(None)?; + let lock = setup.lock_task_list_files(false)?; + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; (lock, active_list) } else { (lock, active_list) @@ -516,7 +592,7 @@ impl TaskListInfoIterator { let archive = if active_only { None } else { - let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true) + let logrotate = LogRotate::new(&setup.task_archive_fn, true) .ok_or_else(|| format_err!("could not get archive file names"))?; Some(logrotate.files()) }; @@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator { /// persistently to files. Task should poll the `abort_requested` /// flag, and stop execution when requested. pub struct WorkerTask { + setup: &'static WorkerTaskSetup, upid: UPID, data: Mutex, abort_requested: AtomicBool, @@ -589,17 +666,26 @@ struct WorkerTaskData { impl WorkerTask { - pub fn new(worker_type: &str, worker_id: Option, auth_id: String, to_stdout: bool) -> Result, Error> { + pub fn new( + worker_type: &str, + worker_id: Option, + auth_id: String, + to_stdout: bool, + ) -> Result, Error> { + + let setup = worker_task_setup()?; + let upid = UPID::new(worker_type, worker_id, auth_id)?; let task_id = upid.task_id; - let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); + let mut path = setup.taskdir.clone(); path.push(format!("{:02X}", upid.pstart & 255)); - let backup_user = pbs_config::backup_user()?; + let dir_opts = setup.file_opts.clone() + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); - create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; + create_path(&path, None, Some(dir_opts))?; path.push(upid.to_string()); @@ -608,12 +694,13 @@ impl WorkerTask { exclusive: true, prefix_time: true, read: true, + file_opts: setup.file_opts.clone(), ..Default::default() }; let logger = FileLogger::new(&path, logger_options)?; - nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; let worker = Arc::new(Self { + setup, upid: upid.clone(), abort_requested: AtomicBool::new(false), data: Mutex::new(WorkerTaskData { @@ -631,7 +718,7 @@ impl WorkerTask { proxmox_rest_server::set_worker_count(hash.len()); } - update_active_workers(Some(&upid))?; + setup.update_active_workers(Some(&upid))?; Ok(worker) } @@ -714,7 +801,7 @@ impl WorkerTask { self.log(state.result_text()); WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); - let _ = update_active_workers(None); + let _ = self.setup.update_active_workers(None); proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); } -- 2.30.2