From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id F0CAF6C6FA for ; Thu, 23 Sep 2021 13:37:12 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id DBAFF24B6A for ; Thu, 23 Sep 2021 13:36:42 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 721F824B5F for ; Thu, 23 Sep 2021 13:36:40 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 40C7944A77 for ; Thu, 23 Sep 2021 13:36:40 +0200 (CEST) Date: Thu, 23 Sep 2021 13:36:30 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20210923101329.950146-1-dietmar@proxmox.com> <20210923101329.950146-2-dietmar@proxmox.com> In-Reply-To: <20210923101329.950146-2-dietmar@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.15.0 (https://github.com/astroidmail/astroid) Message-Id: <1632396562.g1fv04x0ym.astroid@nora.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.203 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_2 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_4 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 23 Sep 2021 11:37:13 -0000 On September 23, 2021 12:13 pm, Dietmar Maurer wrote: > And application now needs to call init_worker_tasks() before using > worker tasks. >=20 > Notable changes: > - need to call init_worker_tasks() before using worker tasks. > - create_task_log_dirs() =C3=ADs called inside init_worker_tasks() > - removed UpidExt trait > - use atomic_open_or_create_file() > - remove pbs_config and pbs_buildcfg dependency > --- > src/api2/node/tasks.rs | 6 +- > src/bin/proxmox-backup-api.rs | 7 +- > src/bin/proxmox-backup-proxy.rs | 5 +- > src/server/mod.rs | 3 - > src/server/upid.rs | 18 -- > src/server/worker_task.rs | 475 +++++++++++++++++++------------- > 6 files changed, 290 insertions(+), 224 deletions(-) > delete mode 100644 src/server/upid.rs >=20 > diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs > index 169a3d4d..df4673a1 100644 > --- a/src/api2/node/tasks.rs > +++ b/src/api2/node/tasks.rs > @@ -16,7 +16,7 @@ use pbs_api_types::{ > }; > =20 > use crate::api2::pull::check_pull_privs; > -use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator}; > +use crate::server::{self, upid_log_path, upid_read_status, TaskState, Ta= skListInfoIterator}; > use pbs_config::CachedUserInfo; > =20 > // matches respective job execution privileges > @@ -220,7 +220,7 @@ async fn get_task_status( > if crate::server::worker_is_active(&upid).await? { > result["status"] =3D Value::from("running"); > } else { > - let exitstatus =3D crate::server::upid_read_status(&upid).unwrap= _or(TaskState::Unknown { endtime: 0 }); > + let exitstatus =3D upid_read_status(&upid).unwrap_or(TaskState::= Unknown { endtime: 0 }); > result["status"] =3D Value::from("stopped"); > result["exitstatus"] =3D Value::from(exitstatus.to_string()); > }; > @@ -287,7 +287,7 @@ async fn read_task_log( > =20 > let mut count: u64 =3D 0; > =20 > - let path =3D upid.log_path(); > + let path =3D upid_log_path(&upid)?; > =20 > let file =3D File::open(path)?; > =20 > diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.r= s > index 9ad10260..9901b85d 100644 > --- a/src/bin/proxmox-backup-api.rs > +++ b/src/bin/proxmox-backup-api.rs > @@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> { > bail!("unable to inititialize syslog - {}", err); > } > =20 > - server::create_task_log_dirs()?; > - > config::create_configdir()?; > =20 > config::update_self_signed_cert(false)?; > @@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> { > =20 > config.enable_auth_log( > pbs_buildcfg::API_AUTH_LOG_FN, > - Some(dir_opts), > - Some(file_opts), > + Some(dir_opts.clone()), > + Some(file_opts.clone()), > &mut commando_sock, > )?; > =20 > =20 > let rest_server =3D RestServer::new(config); > + proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACK= UP_LOG_DIR_M!().into(), file_opts.clone())?; > =20 > // http server future: > let server =3D daemon::create_daemon( > diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-pro= xy.rs > index 518054bf..5d8ed189 100644 > --- a/src/bin/proxmox-backup-proxy.rs > +++ b/src/bin/proxmox-backup-proxy.rs > @@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> { > =20 > config.enable_auth_log( > pbs_buildcfg::API_AUTH_LOG_FN, > - Some(dir_opts), > - Some(file_opts), > + Some(dir_opts.clone()), > + Some(file_opts.clone()), > &mut commando_sock, > )?; > =20 > let rest_server =3D RestServer::new(config); > + proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACK= UP_LOG_DIR_M!().into(), file_opts.clone())?; > =20 > //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/pro= xy.key -out /etc/proxmox-backup/proxy.pem -nodes > =20 > diff --git a/src/server/mod.rs b/src/server/mod.rs > index a7dcee67..77320da6 100644 > --- a/src/server/mod.rs > +++ b/src/server/mod.rs > @@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String { > ctrl_sock_from_pid(*PID) > } > =20 > -mod upid; > -pub use upid::*; > - > mod worker_task; > pub use worker_task::*; > =20 > diff --git a/src/server/upid.rs b/src/server/upid.rs > deleted file mode 100644 > index 70a3e3fb..00000000 > --- a/src/server/upid.rs > +++ /dev/null > @@ -1,18 +0,0 @@ > -pub trait UPIDExt: private::Sealed { > - /// Returns the absolute path to the task log file > - fn log_path(&self) -> std::path::PathBuf; > -} > - > -mod private { > - pub trait Sealed {} > - impl Sealed for pbs_api_types::UPID {} > -} > - > -impl UPIDExt for pbs_api_types::UPID { > - fn log_path(&self) -> std::path::PathBuf { > - let mut path =3D std::path::PathBuf::from(super::PROXMOX_BACKUP_= TASK_DIR); > - path.push(format!("{:02X}", self.pstart % 256)); > - path.push(self.to_string()); > - path > - } > -} > diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs > index 92ab50d7..191d8a44 100644 > --- a/src/server/worker_task.rs > +++ b/src/server/worker_task.rs > @@ -1,5 +1,6 @@ > use std::collections::{HashMap, VecDeque}; > use std::fs::File; > +use std::path::PathBuf; > use std::io::{Read, Write, BufRead, BufReader}; > use std::panic::UnwindSafe; > use std::sync::atomic::{AtomicBool, Ordering}; > @@ -11,27 +12,267 @@ use lazy_static::lazy_static; > use serde_json::{json, Value}; > use serde::{Serialize, Deserialize}; > use tokio::sync::oneshot; > +use nix::fcntl::OFlag; > +use once_cell::sync::OnceCell; > =20 > use proxmox::sys::linux::procfs; > use proxmox::try_block; > -use proxmox::tools::fs::{create_path, replace_file, CreateOptions}; > +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_creat= e_file, CreateOptions}; > =20 > -use pbs_buildcfg; > use pbs_tools::logrotate::{LogRotate, LogRotateFiles}; > use pbs_api_types::UPID; > -use pbs_config::{open_backup_lockfile, BackupLockGuard}; > use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions}; > =20 > -use super::UPIDExt; > +struct TaskListLockGuard(File); > =20 > -macro_rules! taskdir { > - ($subdir:expr) =3D> (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!= (), "/tasks", $subdir)) > +struct WorkerTaskSetup { > + file_opts: CreateOptions, > + taskdir: PathBuf, > + task_lock_fn: PathBuf, > + active_tasks_fn: PathBuf, > + task_index_fn: PathBuf, > + task_archive_fn: PathBuf, > +} > + > +static WORKER_TASK_SETUP: OnceCell =3D OnceCell::new(); > + > +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { > + WORKER_TASK_SETUP.get() > + .ok_or_else(|| format_err!("WorkerTask library is not initialize= d")) > +} > + > +impl WorkerTaskSetup { > + > + fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { > + > + let mut taskdir =3D basedir.clone(); > + taskdir.push("tasks"); > + > + let mut task_lock_fn =3D taskdir.clone(); > + task_lock_fn.push(".active.lock"); > + > + let mut active_tasks_fn =3D taskdir.clone(); > + active_tasks_fn.push("active"); > + > + let mut task_index_fn =3D taskdir.clone(); > + task_index_fn.push("index"); > + > + let mut task_archive_fn =3D taskdir.clone(); > + task_archive_fn.push("archive"); > + > + Self { > + file_opts, > + taskdir, > + task_lock_fn, > + active_tasks_fn, > + task_index_fn, > + task_archive_fn, > + } > + } > + > + fn lock_task_list_files(&self, exclusive: bool) -> Result { since we touch/move this (and thus have to touch all call sites), we=20 could take this opportunity to move the locked operations including=20 access to the active_task_fn, task_index_fn and task_archive_fn struct=20 members to the lock guard (and maybe split it into=20 exclusive/non-exclusive guards?) to make misuse impossible? AFAICT all=20 the current access is done while holding the lock. can of course also be done as follow-up in some generic fashion since=20 this is a recurring problem, just struck me while reading through the=20 rest of the changes. > + let options =3D self.file_opts.clone() > + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); > + > + let timeout =3D std::time::Duration::new(10, 0); > + > + let file =3D proxmox::tools::fs::open_file_locked( > + &self.task_lock_fn, > + timeout, > + exclusive, > + options, > + )?; > + > + Ok(TaskListLockGuard(file)) > + } > + > + fn log_path(&self, upid: &UPID) -> std::path::PathBuf { > + let mut path =3D self.taskdir.clone(); > + path.push(format!("{:02X}", upid.pstart % 256)); > + path.push(upid.to_string()); > + path > + } > + > + // atomically read/update the task list, update status of finished t= asks > + // new_upid is added to the list when specified. > + fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(= ), Error> { > + > + let lock =3D self.lock_task_list_files(true)?; > + > + // TODO remove with 1.x > + let mut finish_list: Vec =3D read_task_file_from_p= ath(&self.task_index_fn)?; > + let had_index_file =3D !finish_list.is_empty(); > + > + // We use filter_map because one negative case wants to *move* t= he data into `finish_list`, > + // clippy doesn't quite catch this! > + #[allow(clippy::unnecessary_filter_map)] > + let mut active_list: Vec =3D read_task_file_from_p= ath(&self.active_tasks_fn)? > + .into_iter() > + .filter_map(|info| { > + if info.state.is_some() { > + // this can happen when the active file still includ= es finished tasks > + finish_list.push(info); > + return None; > + } > + > + if !worker_is_active_local(&info.upid) { > + // println!("Detected stopped task '{}'", &info.upid= _str); > + let now =3D proxmox::tools::time::epoch_i64(); > + let status =3D upid_read_status(&info.upid).unwrap_o= r(TaskState::Unknown { endtime: now }); > + finish_list.push(TaskListInfo { > + upid: info.upid, > + upid_str: info.upid_str, > + state: Some(status) > + }); > + return None; > + } > + > + Some(info) > + }).collect(); > + > + if let Some(upid) =3D new_upid { > + active_list.push(TaskListInfo { upid: upid.clone(), upid_str= : upid.to_string(), state: None }); > + } > + > + let active_raw =3D render_task_list(&active_list); > + > + let options =3D self.file_opts.clone() > + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); > + > + replace_file( > + &self.active_tasks_fn, > + active_raw.as_bytes(), > + options, > + )?; > + > + finish_list.sort_unstable_by(|a, b| { > + match (&a.state, &b.state) { > + (Some(s1), Some(s2)) =3D> s1.cmp(&s2), > + (Some(_), None) =3D> std::cmp::Ordering::Less, > + (None, Some(_)) =3D> std::cmp::Ordering::Greater, > + _ =3D> a.upid.starttime.cmp(&b.upid.starttime), > + } > + }); > + > + if !finish_list.is_empty() { > + let options =3D self.file_opts.clone() > + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); > + > + let mut writer =3D atomic_open_or_create_file( > + &self.task_archive_fn, > + OFlag::O_APPEND | OFlag::O_RDWR, > + &[], > + options, > + )?; > + for info in &finish_list { > + writer.write_all(render_task_line(&info).as_bytes())?; > + } > + } > + > + // TODO Remove with 1.x > + // for compatibility, if we had an INDEX file, we do not need it= anymore > + if had_index_file { > + let _ =3D nix::unistd::unlink(&self.task_index_fn); > + } > + > + drop(lock); > + > + Ok(()) > + } > + > + // Create task log directory with correct permissions > + fn create_task_log_dirs(&self) -> Result<(), Error> { > + > + try_block!({ > + let dir_opts =3D self.file_opts.clone() > + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); > + > + create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_= opts.clone()))?; > + // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DI= R, None, Some(opts))?; > + Ok(()) > + }).map_err(|err: Error| format_err!("unable to create task log d= ir - {}", err)) > + } > +} > + > +/// Initialize the WorkerTask library > +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> = Result<(), Error> { > + let setup =3D WorkerTaskSetup::new(basedir, file_opts); > + setup.create_task_log_dirs()?; > + WORKER_TASK_SETUP.set(setup) > + .map_err(|_| format_err!("init_worker_tasks failed - already ini= tialized")) > +} > + > +/// checks if the Task Archive is bigger that 'size_threshold' bytes, an= d > +/// rotates it if it is > +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_= files: Option) -> Result { > + > + let setup =3D worker_task_setup()?; > + > + let _lock =3D setup.lock_task_list_files(true)?; > + > + let mut logrotate =3D LogRotate::new(&setup.task_archive_fn, compres= s) > + .ok_or_else(|| format_err!("could not get archive file names= "))?; > + > + logrotate.rotate(size_threshold, None, max_files) > +} > + > + > +/// Path to the worker log file > +pub fn upid_log_path(upid: &UPID) -> Result { > + let setup =3D worker_task_setup()?; > + Ok(setup.log_path(upid)) > +} > + > +/// Read endtime (time of last log line) and exitstatus from task log fi= le > +/// If there is not a single line with at valid datetime, we assume the > +/// starttime to be the endtime > +pub fn upid_read_status(upid: &UPID) -> Result { > + > + let setup =3D worker_task_setup()?; > + > + let mut status =3D TaskState::Unknown { endtime: upid.starttime }; > + > + let path =3D setup.log_path(upid); > + > + let mut file =3D File::open(path)?; > + > + /// speedup - only read tail > + use std::io::Seek; > + use std::io::SeekFrom; > + let _ =3D file.seek(SeekFrom::End(-8192)); // ignore errors > + > + let mut data =3D Vec::with_capacity(8192); > + file.read_to_end(&mut data)?; > + > + // strip newlines at the end of the task logs > + while data.last() =3D=3D Some(&b'\n') { > + data.pop(); > + } > + > + let last_line =3D match data.iter().rposition(|c| *c =3D=3D b'\n') { > + Some(start) if data.len() > (start+1) =3D> &data[start+1..], > + Some(_) =3D> &data, // should not happen, since we removed all t= railing newlines > + None =3D> &data, > + }; > + > + let last_line =3D std::str::from_utf8(last_line) > + .map_err(|err| format_err!("upid_read_status: utf8 parse failed:= {}", err))?; > + > + let mut iter =3D last_line.splitn(2, ": "); > + if let Some(time_str) =3D iter.next() { > + if let Ok(endtime) =3D proxmox::tools::time::parse_rfc3339(time_= str) { > + // set the endtime even if we cannot parse the state > + status =3D TaskState::Unknown { endtime }; > + if let Some(rest) =3D iter.next().and_then(|rest| rest.strip= _prefix("TASK ")) { > + if let Ok(state) =3D TaskState::from_endtime_and_message= (endtime, rest) { > + status =3D state; > + } > + } > + } > + } > + > + Ok(status) > } > -pub const PROXMOX_BACKUP_TASK_DIR: &str =3D taskdir!("/"); > -pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str =3D taskdir!("/.active.lock"= ); > -pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str =3D taskdir!("/active"); > -pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str =3D taskdir!("/index"); > -pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str =3D taskdir!("/archive"); > =20 > lazy_static! { > static ref WORKER_TASK_LIST: Mutex>> = =3D Mutex::new(HashMap::new()); > @@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(S= tring, UPID, Option } > } > =20 > -/// Create task log directory with correct permissions > -pub fn create_task_log_dirs() -> Result<(), Error> { > - > - try_block!({ > - let backup_user =3D pbs_config::backup_user()?; > - let opts =3D CreateOptions::new() > - .owner(backup_user.uid) > - .group(backup_user.gid); > - > - create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opt= s.clone()))?; > - create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; > - create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opt= s))?; > - Ok(()) > - }).map_err(|err: Error| format_err!("unable to create task log dir -= {}", err))?; > - > - Ok(()) > -} > - > -/// Read endtime (time of last log line) and exitstatus from task log fi= le > -/// If there is not a single line with at valid datetime, we assume the > -/// starttime to be the endtime > -pub fn upid_read_status(upid: &UPID) -> Result { > - > - let mut status =3D TaskState::Unknown { endtime: upid.starttime }; > - > - let path =3D upid.log_path(); > - > - let mut file =3D File::open(path)?; > - > - /// speedup - only read tail > - use std::io::Seek; > - use std::io::SeekFrom; > - let _ =3D file.seek(SeekFrom::End(-8192)); // ignore errors > - > - let mut data =3D Vec::with_capacity(8192); > - file.read_to_end(&mut data)?; > - > - // strip newlines at the end of the task logs > - while data.last() =3D=3D Some(&b'\n') { > - data.pop(); > - } > - > - let last_line =3D match data.iter().rposition(|c| *c =3D=3D b'\n') { > - Some(start) if data.len() > (start+1) =3D> &data[start+1..], > - Some(_) =3D> &data, // should not happen, since we removed all t= railing newlines > - None =3D> &data, > - }; > - > - let last_line =3D std::str::from_utf8(last_line) > - .map_err(|err| format_err!("upid_read_status: utf8 parse failed:= {}", err))?; > - > - let mut iter =3D last_line.splitn(2, ": "); > - if let Some(time_str) =3D iter.next() { > - if let Ok(endtime) =3D proxmox::tools::time::parse_rfc3339(time_= str) { > - // set the endtime even if we cannot parse the state > - status =3D TaskState::Unknown { endtime }; > - if let Some(rest) =3D iter.next().and_then(|rest| rest.strip= _prefix("TASK ")) { > - if let Ok(state) =3D TaskState::from_endtime_and_message= (endtime, rest) { > - status =3D state; > - } > - } > - } > - } > - > - Ok(status) > -} > - > /// Task State > #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] > pub enum TaskState { > @@ -323,107 +497,6 @@ impl Into for TaskList= Info { > } > } > =20 > -fn lock_task_list_files(exclusive: bool) -> Result { > - open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive) > -} > - > -/// checks if the Task Archive is bigger that 'size_threshold' bytes, an= d > -/// rotates it if it is > -pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_= files: Option) -> Result { > - let _lock =3D lock_task_list_files(true)?; > - > - let mut logrotate =3D LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN,= compress) > - .ok_or_else(|| format_err!("could not get archive file names"))?= ; > - > - logrotate.rotate(size_threshold, None, max_files) > -} > - > -// atomically read/update the task list, update status of finished tasks > -// new_upid is added to the list when specified. > -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { > - > - let backup_user =3D pbs_config::backup_user()?; > - > - let lock =3D lock_task_list_files(true)?; > - > - // TODO remove with 1.x > - let mut finish_list: Vec =3D read_task_file_from_path(= PROXMOX_BACKUP_INDEX_TASK_FN)?; > - let had_index_file =3D !finish_list.is_empty(); > - > - // We use filter_map because one negative case wants to *move* the d= ata into `finish_list`, > - // clippy doesn't quite catch this! > - #[allow(clippy::unnecessary_filter_map)] > - let mut active_list: Vec =3D read_task_file_from_path(= PROXMOX_BACKUP_ACTIVE_TASK_FN)? > - .into_iter() > - .filter_map(|info| { > - if info.state.is_some() { > - // this can happen when the active file still includes f= inished tasks > - finish_list.push(info); > - return None; > - } > - > - if !worker_is_active_local(&info.upid) { > - // println!("Detected stopped task '{}'", &info.upid_str= ); > - let now =3D proxmox::tools::time::epoch_i64(); > - let status =3D upid_read_status(&info.upid).unwrap_or(Ta= skState::Unknown { endtime: now }); > - finish_list.push(TaskListInfo { > - upid: info.upid, > - upid_str: info.upid_str, > - state: Some(status) > - }); > - return None; > - } > - > - Some(info) > - }).collect(); > - > - if let Some(upid) =3D new_upid { > - active_list.push(TaskListInfo { upid: upid.clone(), upid_str: up= id.to_string(), state: None }); > - } > - > - let active_raw =3D render_task_list(&active_list); > - > - replace_file( > - PROXMOX_BACKUP_ACTIVE_TASK_FN, > - active_raw.as_bytes(), > - CreateOptions::new() > - .owner(backup_user.uid) > - .group(backup_user.gid), > - )?; > - > - finish_list.sort_unstable_by(|a, b| { > - match (&a.state, &b.state) { > - (Some(s1), Some(s2)) =3D> s1.cmp(&s2), > - (Some(_), None) =3D> std::cmp::Ordering::Less, > - (None, Some(_)) =3D> std::cmp::Ordering::Greater, > - _ =3D> a.upid.starttime.cmp(&b.upid.starttime), > - } > - }); > - > - if !finish_list.is_empty() { > - match std::fs::OpenOptions::new().append(true).create(true).open= (PROXMOX_BACKUP_ARCHIVE_TASK_FN) { > - Ok(mut writer) =3D> { > - for info in &finish_list { > - writer.write_all(render_task_line(&info).as_bytes())= ?; > - } > - }, > - Err(err) =3D> bail!("could not write task archive - {}", err= ), > - } > - > - nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_u= ser.uid), Some(backup_user.gid))?; > - } > - > - // TODO Remove with 1.x > - // for compatibility, if we had an INDEX file, we do not need it any= more > - if had_index_file { > - let _ =3D nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN); > - } > - > - drop(lock); > - > - Ok(()) > -} > - > fn render_task_line(info: &TaskListInfo) -> String { > let mut raw =3D String::new(); > if let Some(status) =3D &info.state { > @@ -486,27 +559,30 @@ pub struct TaskListInfoIterator { > list: VecDeque, > end: bool, > archive: Option, > - lock: Option, > + lock: Option, > } > =20 > impl TaskListInfoIterator { > pub fn new(active_only: bool) -> Result { > + > + let setup =3D worker_task_setup()?; > + > let (read_lock, active_list) =3D { > - let lock =3D lock_task_list_files(false)?; > - let active_list =3D read_task_file_from_path(PROXMOX_BACKUP_= ACTIVE_TASK_FN)?; > + let lock =3D setup.lock_task_list_files(false)?; > + let active_list =3D read_task_file_from_path(&setup.active_t= asks_fn)?; > =20 > let needs_update =3D active_list > .iter() > .any(|info| info.state.is_some() || !worker_is_active_lo= cal(&info.upid)); > =20 > // TODO remove with 1.x > - let index_exists =3D std::path::Path::new(PROXMOX_BACKUP_IND= EX_TASK_FN).is_file(); > + let index_exists =3D setup.task_index_fn.is_file(); > =20 > if needs_update || index_exists { > drop(lock); > - update_active_workers(None)?; > - let lock =3D lock_task_list_files(false)?; > - let active_list =3D read_task_file_from_path(PROXMOX_BAC= KUP_ACTIVE_TASK_FN)?; > + setup.update_active_workers(None)?; > + let lock =3D setup.lock_task_list_files(false)?; > + let active_list =3D read_task_file_from_path(&setup.acti= ve_tasks_fn)?; > (lock, active_list) > } else { > (lock, active_list) > @@ -516,7 +592,7 @@ impl TaskListInfoIterator { > let archive =3D if active_only { > None > } else { > - let logrotate =3D LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK= _FN, true) > + let logrotate =3D LogRotate::new(&setup.task_archive_fn, tru= e) > .ok_or_else(|| format_err!("could not get archive file n= ames"))?; > Some(logrotate.files()) > }; > @@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator { > /// persistently to files. Task should poll the `abort_requested` > /// flag, and stop execution when requested. > pub struct WorkerTask { > + setup: &'static WorkerTaskSetup, > upid: UPID, > data: Mutex, > abort_requested: AtomicBool, > @@ -589,17 +666,26 @@ struct WorkerTaskData { > =20 > impl WorkerTask { > =20 > - pub fn new(worker_type: &str, worker_id: Option, auth_id: St= ring, to_stdout: bool) -> Result, Error> { > + pub fn new( > + worker_type: &str, > + worker_id: Option, > + auth_id: String, > + to_stdout: bool, > + ) -> Result, Error> { > + > + let setup =3D worker_task_setup()?; > + > let upid =3D UPID::new(worker_type, worker_id, auth_id)?; > let task_id =3D upid.task_id; > =20 > - let mut path =3D std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DI= R); > + let mut path =3D setup.taskdir.clone(); > =20 > path.push(format!("{:02X}", upid.pstart & 255)); > =20 > - let backup_user =3D pbs_config::backup_user()?; > + let dir_opts =3D setup.file_opts.clone() > + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); > =20 > - create_path(&path, None, Some(CreateOptions::new().owner(backup_= user.uid).group(backup_user.gid)))?; > + create_path(&path, None, Some(dir_opts))?; > =20 > path.push(upid.to_string()); > =20 > @@ -608,12 +694,13 @@ impl WorkerTask { > exclusive: true, > prefix_time: true, > read: true, > + file_opts: setup.file_opts.clone(), > ..Default::default() > }; > let logger =3D FileLogger::new(&path, logger_options)?; > - nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_use= r.gid))?; > =20 > let worker =3D Arc::new(Self { > + setup, > upid: upid.clone(), > abort_requested: AtomicBool::new(false), > data: Mutex::new(WorkerTaskData { > @@ -631,7 +718,7 @@ impl WorkerTask { > proxmox_rest_server::set_worker_count(hash.len()); > } > =20 > - update_active_workers(Some(&upid))?; > + setup.update_active_workers(Some(&upid))?; > =20 > Ok(worker) > } > @@ -714,7 +801,7 @@ impl WorkerTask { > self.log(state.result_text()); > =20 > WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); > - let _ =3D update_active_workers(None); > + let _ =3D self.setup.update_active_workers(None); > proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().un= wrap().len()); > } > =20 > --=20 > 2.30.2 >=20 >=20 >=20 > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel >=20