From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
Date: Thu, 23 Sep 2021 12:13:25 +0200 [thread overview]
Message-ID: <20210923101329.950146-2-dietmar@proxmox.com> (raw)
In-Reply-To: <20210923101329.950146-1-dietmar@proxmox.com>
And application now needs to call init_worker_tasks() before using
worker tasks.
Notable changes:
- need to call init_worker_tasks() before using worker tasks.
- create_task_log_dirs() ís called inside init_worker_tasks()
- removed UpidExt trait
- use atomic_open_or_create_file()
- remove pbs_config and pbs_buildcfg dependency
---
src/api2/node/tasks.rs | 6 +-
src/bin/proxmox-backup-api.rs | 7 +-
src/bin/proxmox-backup-proxy.rs | 5 +-
src/server/mod.rs | 3 -
src/server/upid.rs | 18 --
src/server/worker_task.rs | 475 +++++++++++++++++++-------------
6 files changed, 290 insertions(+), 224 deletions(-)
delete mode 100644 src/server/upid.rs
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 169a3d4d..df4673a1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,7 @@ use pbs_api_types::{
};
use crate::api2::pull::check_pull_privs;
-use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
+use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
use pbs_config::CachedUserInfo;
// matches respective job execution privileges
@@ -220,7 +220,7 @@ async fn get_task_status(
if crate::server::worker_is_active(&upid).await? {
result["status"] = Value::from("running");
} else {
- let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
+ let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
result["status"] = Value::from("stopped");
result["exitstatus"] = Value::from(exitstatus.to_string());
};
@@ -287,7 +287,7 @@ async fn read_task_log(
let mut count: u64 = 0;
- let path = upid.log_path();
+ let path = upid_log_path(&upid)?;
let file = File::open(path)?;
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9ad10260..9901b85d 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
bail!("unable to inititialize syslog - {}", err);
}
- server::create_task_log_dirs()?;
-
config::create_configdir()?;
config::update_self_signed_cert(false)?;
@@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
config.enable_auth_log(
pbs_buildcfg::API_AUTH_LOG_FN,
- Some(dir_opts),
- Some(file_opts),
+ Some(dir_opts.clone()),
+ Some(file_opts.clone()),
&mut commando_sock,
)?;
let rest_server = RestServer::new(config);
+ proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
// http server future:
let server = daemon::create_daemon(
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 518054bf..5d8ed189 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
config.enable_auth_log(
pbs_buildcfg::API_AUTH_LOG_FN,
- Some(dir_opts),
- Some(file_opts),
+ Some(dir_opts.clone()),
+ Some(file_opts.clone()),
&mut commando_sock,
)?;
let rest_server = RestServer::new(config);
+ proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
diff --git a/src/server/mod.rs b/src/server/mod.rs
index a7dcee67..77320da6 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
-mod upid;
-pub use upid::*;
-
mod worker_task;
pub use worker_task::*;
diff --git a/src/server/upid.rs b/src/server/upid.rs
deleted file mode 100644
index 70a3e3fb..00000000
--- a/src/server/upid.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-pub trait UPIDExt: private::Sealed {
- /// Returns the absolute path to the task log file
- fn log_path(&self) -> std::path::PathBuf;
-}
-
-mod private {
- pub trait Sealed {}
- impl Sealed for pbs_api_types::UPID {}
-}
-
-impl UPIDExt for pbs_api_types::UPID {
- fn log_path(&self) -> std::path::PathBuf {
- let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
- path.push(format!("{:02X}", self.pstart % 256));
- path.push(self.to_string());
- path
- }
-}
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 92ab50d7..191d8a44 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -1,5 +1,6 @@
use std::collections::{HashMap, VecDeque};
use std::fs::File;
+use std::path::PathBuf;
use std::io::{Read, Write, BufRead, BufReader};
use std::panic::UnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -11,27 +12,267 @@ use lazy_static::lazy_static;
use serde_json::{json, Value};
use serde::{Serialize, Deserialize};
use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
use proxmox::sys::linux::procfs;
use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
-use pbs_buildcfg;
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
use pbs_api_types::UPID;
-use pbs_config::{open_backup_lockfile, BackupLockGuard};
use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
-use super::UPIDExt;
+struct TaskListLockGuard(File);
-macro_rules! taskdir {
- ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+struct WorkerTaskSetup {
+ file_opts: CreateOptions,
+ taskdir: PathBuf,
+ task_lock_fn: PathBuf,
+ active_tasks_fn: PathBuf,
+ task_index_fn: PathBuf,
+ task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+ WORKER_TASK_SETUP.get()
+ .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+ fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+ let mut taskdir = basedir.clone();
+ taskdir.push("tasks");
+
+ let mut task_lock_fn = taskdir.clone();
+ task_lock_fn.push(".active.lock");
+
+ let mut active_tasks_fn = taskdir.clone();
+ active_tasks_fn.push("active");
+
+ let mut task_index_fn = taskdir.clone();
+ task_index_fn.push("index");
+
+ let mut task_archive_fn = taskdir.clone();
+ task_archive_fn.push("archive");
+
+ Self {
+ file_opts,
+ taskdir,
+ task_lock_fn,
+ active_tasks_fn,
+ task_index_fn,
+ task_archive_fn,
+ }
+ }
+
+ fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let timeout = std::time::Duration::new(10, 0);
+
+ let file = proxmox::tools::fs::open_file_locked(
+ &self.task_lock_fn,
+ timeout,
+ exclusive,
+ options,
+ )?;
+
+ Ok(TaskListLockGuard(file))
+ }
+
+ fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+ let mut path = self.taskdir.clone();
+ path.push(format!("{:02X}", upid.pstart % 256));
+ path.push(upid.to_string());
+ path
+ }
+
+ // atomically read/update the task list, update status of finished tasks
+ // new_upid is added to the list when specified.
+ fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+ let lock = self.lock_task_list_files(true)?;
+
+ // TODO remove with 1.x
+ let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+ let had_index_file = !finish_list.is_empty();
+
+ // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+ // clippy doesn't quite catch this!
+ #[allow(clippy::unnecessary_filter_map)]
+ let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+ .into_iter()
+ .filter_map(|info| {
+ if info.state.is_some() {
+ // this can happen when the active file still includes finished tasks
+ finish_list.push(info);
+ return None;
+ }
+
+ if !worker_is_active_local(&info.upid) {
+ // println!("Detected stopped task '{}'", &info.upid_str);
+ let now = proxmox::tools::time::epoch_i64();
+ let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+ finish_list.push(TaskListInfo {
+ upid: info.upid,
+ upid_str: info.upid_str,
+ state: Some(status)
+ });
+ return None;
+ }
+
+ Some(info)
+ }).collect();
+
+ if let Some(upid) = new_upid {
+ active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+ }
+
+ let active_raw = render_task_list(&active_list);
+
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ replace_file(
+ &self.active_tasks_fn,
+ active_raw.as_bytes(),
+ options,
+ )?;
+
+ finish_list.sort_unstable_by(|a, b| {
+ match (&a.state, &b.state) {
+ (Some(s1), Some(s2)) => s1.cmp(&s2),
+ (Some(_), None) => std::cmp::Ordering::Less,
+ (None, Some(_)) => std::cmp::Ordering::Greater,
+ _ => a.upid.starttime.cmp(&b.upid.starttime),
+ }
+ });
+
+ if !finish_list.is_empty() {
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let mut writer = atomic_open_or_create_file(
+ &self.task_archive_fn,
+ OFlag::O_APPEND | OFlag::O_RDWR,
+ &[],
+ options,
+ )?;
+ for info in &finish_list {
+ writer.write_all(render_task_line(&info).as_bytes())?;
+ }
+ }
+
+ // TODO Remove with 1.x
+ // for compatibility, if we had an INDEX file, we do not need it anymore
+ if had_index_file {
+ let _ = nix::unistd::unlink(&self.task_index_fn);
+ }
+
+ drop(lock);
+
+ Ok(())
+ }
+
+ // Create task log directory with correct permissions
+ fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+ try_block!({
+ let dir_opts = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+ create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+ // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+ Ok(())
+ }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+ }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+ let setup = WorkerTaskSetup::new(basedir, file_opts);
+ setup.create_task_log_dirs()?;
+ WORKER_TASK_SETUP.set(setup)
+ .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let _lock = setup.lock_task_list_files(true)?;
+
+ let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+ .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+ logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+ let setup = worker_task_setup()?;
+ Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+ let path = setup.log_path(upid);
+
+ let mut file = File::open(path)?;
+
+ /// speedup - only read tail
+ use std::io::Seek;
+ use std::io::SeekFrom;
+ let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+ let mut data = Vec::with_capacity(8192);
+ file.read_to_end(&mut data)?;
+
+ // strip newlines at the end of the task logs
+ while data.last() == Some(&b'\n') {
+ data.pop();
+ }
+
+ let last_line = match data.iter().rposition(|c| *c == b'\n') {
+ Some(start) if data.len() > (start+1) => &data[start+1..],
+ Some(_) => &data, // should not happen, since we removed all trailing newlines
+ None => &data,
+ };
+
+ let last_line = std::str::from_utf8(last_line)
+ .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+ let mut iter = last_line.splitn(2, ": ");
+ if let Some(time_str) = iter.next() {
+ if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+ // set the endtime even if we cannot parse the state
+ status = TaskState::Unknown { endtime };
+ if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+ if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+ status = state;
+ }
+ }
+ }
+ }
+
+ Ok(status)
}
-pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
@@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
}
}
-/// Create task log directory with correct permissions
-pub fn create_task_log_dirs() -> Result<(), Error> {
-
- try_block!({
- let backup_user = pbs_config::backup_user()?;
- let opts = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
- create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
- create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
- Ok(())
- }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
-
- Ok(())
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
- let mut status = TaskState::Unknown { endtime: upid.starttime };
-
- let path = upid.log_path();
-
- let mut file = File::open(path)?;
-
- /// speedup - only read tail
- use std::io::Seek;
- use std::io::SeekFrom;
- let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
- let mut data = Vec::with_capacity(8192);
- file.read_to_end(&mut data)?;
-
- // strip newlines at the end of the task logs
- while data.last() == Some(&b'\n') {
- data.pop();
- }
-
- let last_line = match data.iter().rposition(|c| *c == b'\n') {
- Some(start) if data.len() > (start+1) => &data[start+1..],
- Some(_) => &data, // should not happen, since we removed all trailing newlines
- None => &data,
- };
-
- let last_line = std::str::from_utf8(last_line)
- .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
- let mut iter = last_line.splitn(2, ": ");
- if let Some(time_str) = iter.next() {
- if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
- // set the endtime even if we cannot parse the state
- status = TaskState::Unknown { endtime };
- if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
- if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
- status = state;
- }
- }
- }
- }
-
- Ok(status)
-}
-
/// Task State
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskState {
@@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
}
}
-fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
- open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
- let _lock = lock_task_list_files(true)?;
-
- let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
- .ok_or_else(|| format_err!("could not get archive file names"))?;
-
- logrotate.rotate(size_threshold, None, max_files)
-}
-
-// atomically read/update the task list, update status of finished tasks
-// new_upid is added to the list when specified.
-fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
-
- let backup_user = pbs_config::backup_user()?;
-
- let lock = lock_task_list_files(true)?;
-
- // TODO remove with 1.x
- let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
- let had_index_file = !finish_list.is_empty();
-
- // We use filter_map because one negative case wants to *move* the data into `finish_list`,
- // clippy doesn't quite catch this!
- #[allow(clippy::unnecessary_filter_map)]
- let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
- .into_iter()
- .filter_map(|info| {
- if info.state.is_some() {
- // this can happen when the active file still includes finished tasks
- finish_list.push(info);
- return None;
- }
-
- if !worker_is_active_local(&info.upid) {
- // println!("Detected stopped task '{}'", &info.upid_str);
- let now = proxmox::tools::time::epoch_i64();
- let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
- finish_list.push(TaskListInfo {
- upid: info.upid,
- upid_str: info.upid_str,
- state: Some(status)
- });
- return None;
- }
-
- Some(info)
- }).collect();
-
- if let Some(upid) = new_upid {
- active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
- }
-
- let active_raw = render_task_list(&active_list);
-
- replace_file(
- PROXMOX_BACKUP_ACTIVE_TASK_FN,
- active_raw.as_bytes(),
- CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid),
- )?;
-
- finish_list.sort_unstable_by(|a, b| {
- match (&a.state, &b.state) {
- (Some(s1), Some(s2)) => s1.cmp(&s2),
- (Some(_), None) => std::cmp::Ordering::Less,
- (None, Some(_)) => std::cmp::Ordering::Greater,
- _ => a.upid.starttime.cmp(&b.upid.starttime),
- }
- });
-
- if !finish_list.is_empty() {
- match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
- Ok(mut writer) => {
- for info in &finish_list {
- writer.write_all(render_task_line(&info).as_bytes())?;
- }
- },
- Err(err) => bail!("could not write task archive - {}", err),
- }
-
- nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
- }
-
- // TODO Remove with 1.x
- // for compatibility, if we had an INDEX file, we do not need it anymore
- if had_index_file {
- let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
- }
-
- drop(lock);
-
- Ok(())
-}
-
fn render_task_line(info: &TaskListInfo) -> String {
let mut raw = String::new();
if let Some(status) = &info.state {
@@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
list: VecDeque<TaskListInfo>,
end: bool,
archive: Option<LogRotateFiles>,
- lock: Option<BackupLockGuard>,
+ lock: Option<TaskListLockGuard>,
}
impl TaskListInfoIterator {
pub fn new(active_only: bool) -> Result<Self, Error> {
+
+ let setup = worker_task_setup()?;
+
let (read_lock, active_list) = {
- let lock = lock_task_list_files(false)?;
- let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+ let lock = setup.lock_task_list_files(false)?;
+ let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
let needs_update = active_list
.iter()
.any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
// TODO remove with 1.x
- let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+ let index_exists = setup.task_index_fn.is_file();
if needs_update || index_exists {
drop(lock);
- update_active_workers(None)?;
- let lock = lock_task_list_files(false)?;
- let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+ setup.update_active_workers(None)?;
+ let lock = setup.lock_task_list_files(false)?;
+ let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
(lock, active_list)
} else {
(lock, active_list)
@@ -516,7 +592,7 @@ impl TaskListInfoIterator {
let archive = if active_only {
None
} else {
- let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
+ let logrotate = LogRotate::new(&setup.task_archive_fn, true)
.ok_or_else(|| format_err!("could not get archive file names"))?;
Some(logrotate.files())
};
@@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
/// persistently to files. Task should poll the `abort_requested`
/// flag, and stop execution when requested.
pub struct WorkerTask {
+ setup: &'static WorkerTaskSetup,
upid: UPID,
data: Mutex<WorkerTaskData>,
abort_requested: AtomicBool,
@@ -589,17 +666,26 @@ struct WorkerTaskData {
impl WorkerTask {
- pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
+ pub fn new(
+ worker_type: &str,
+ worker_id: Option<String>,
+ auth_id: String,
+ to_stdout: bool,
+ ) -> Result<Arc<Self>, Error> {
+
+ let setup = worker_task_setup()?;
+
let upid = UPID::new(worker_type, worker_id, auth_id)?;
let task_id = upid.task_id;
- let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
+ let mut path = setup.taskdir.clone();
path.push(format!("{:02X}", upid.pstart & 255));
- let backup_user = pbs_config::backup_user()?;
+ let dir_opts = setup.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
- create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
+ create_path(&path, None, Some(dir_opts))?;
path.push(upid.to_string());
@@ -608,12 +694,13 @@ impl WorkerTask {
exclusive: true,
prefix_time: true,
read: true,
+ file_opts: setup.file_opts.clone(),
..Default::default()
};
let logger = FileLogger::new(&path, logger_options)?;
- nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
let worker = Arc::new(Self {
+ setup,
upid: upid.clone(),
abort_requested: AtomicBool::new(false),
data: Mutex::new(WorkerTaskData {
@@ -631,7 +718,7 @@ impl WorkerTask {
proxmox_rest_server::set_worker_count(hash.len());
}
- update_active_workers(Some(&upid))?;
+ setup.update_active_workers(Some(&upid))?;
Ok(worker)
}
@@ -714,7 +801,7 @@ impl WorkerTask {
self.log(state.result_text());
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
- let _ = update_active_workers(None);
+ let _ = self.setup.update_active_workers(None);
proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
}
--
2.30.2
next prev parent reply other threads:[~2021-09-23 10:13 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
2021-09-23 10:13 ` Dietmar Maurer [this message]
2021-09-23 11:36 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Fabian Grünbichler
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210923101329.950146-2-dietmar@proxmox.com \
--to=dietmar@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.