* [pdm-devel] [PATCH datacenter-manager 1/2] server: add jobstate module from PBS
2025-11-27 15:37 [pdm-devel] [PATCH datacenter-manager 0/2] access/auth/task-log rotation Lukas Wagner
@ 2025-11-27 15:37 ` Lukas Wagner
2025-11-27 15:37 ` [pdm-devel] [PATCH datacenter-manager 2/2] server: add task log, auth log and access log rotation Lukas Wagner
1 sibling, 0 replies; 3+ messages in thread
From: Lukas Wagner @ 2025-11-27 15:37 UTC (permalink / raw)
To: pdm-devel
This is a general helper module that helps with keeping track of
periodic tasks. Thus far we have not really needed it in PDM (although
it could be useful for remote update fetching), but this changes with
task log rotation which will be added next.
The module was copied as is and then adapted where needed to make it
work with PDM; some types and paths were changed. In the future this
should be broken out and added to some shared crate (could be a new
micro crate if no better place comes to mind), but given the time
constraints a copy is good enough for now. This module will probably
remain pretty unchanged in the near future, so the risk of diverging
implementations should be pretty small.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/jobstate.rs | 299 +++++++++++++++++++++++++++++++++++++++++
server/src/lib.rs | 1 +
2 files changed, 300 insertions(+)
create mode 100644 server/src/jobstate.rs
diff --git a/server/src/jobstate.rs b/server/src/jobstate.rs
new file mode 100644
index 00000000..c8e0d766
--- /dev/null
+++ b/server/src/jobstate.rs
@@ -0,0 +1,299 @@
+// FIXME: This was copied from PBS - move this to a shared crate and then make both
+// use the same implementation. Maybe use this opportunity to
+// - not use hard-coded paths
+// - add tests
+// - audit the public API in general and improve if needed
+
+use std::path::{Path, PathBuf};
+
+use anyhow::{bail, format_err, Error};
+use nix::sys::stat::Mode;
+use serde::{Deserialize, Serialize};
+
+use proxmox_product_config::ApiLockGuard;
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
+use proxmox_sys::fs::{create_path, file_read_optional_string, replace_file};
+use proxmox_time::CalendarEvent;
+
+use pbs_api_types::JobScheduleStatus;
+
+use pdm_api_types::UPID;
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+/// Represents the State of a specific Job
+pub enum JobState {
+ /// A job was created at 'time', but never started/finished
+ Created { time: i64 },
+ /// The Job was last started in 'upid',
+ Started { upid: String },
+ /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated'
+ Finished {
+ upid: String,
+ state: TaskState,
+ updated: Option<i64>,
+ },
+}
+
+/// Represents a Job and holds the correct lock
+pub struct Job {
+ jobtype: String,
+ jobname: String,
+ /// The State of the job
+ pub state: JobState,
+ _lock: ApiLockGuard,
+}
+
+const JOB_STATE_BASEDIR: &str = concat!(pdm_buildcfg::PDM_STATE_DIR_M!(), "/jobstates");
+
+/// Create jobstate stat dir with correct permission
+pub fn create_jobstate_dir() -> Result<(), Error> {
+ let mode = Mode::from_bits_truncate(0o0750);
+ let opts = proxmox_product_config::default_create_options().perm(mode);
+
+ create_path(JOB_STATE_BASEDIR, Some(opts), Some(opts))
+ .map_err(|err: Error| format_err!("unable to create job state dir - {err}"))?;
+
+ Ok(())
+}
+
+fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
+ let mut path = PathBuf::from(JOB_STATE_BASEDIR);
+ path.push(format!("{jobtype}-{jobname}.json"));
+ path
+}
+
+fn get_lock<P>(path: P) -> Result<ApiLockGuard, Error>
+where
+ P: AsRef<Path>,
+{
+ let mut path = path.as_ref().to_path_buf();
+ path.set_extension("lck");
+ proxmox_product_config::open_api_lockfile(path, None, true)
+}
+
+/// Removes the statefile of a job, this is useful if we delete a job
+pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+ let mut path = get_path(jobtype, jobname);
+ let _lock = get_lock(&path)?;
+ if let Err(err) = std::fs::remove_file(&path) {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ bail!("cannot remove statefile for {jobtype} - {jobname}: {err}");
+ }
+ }
+ path.set_extension("lck");
+ if let Err(err) = std::fs::remove_file(&path) {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ bail!("cannot remove lockfile for {jobtype} - {jobname}: {err}");
+ }
+ }
+ Ok(())
+}
+
+/// Creates the statefile with the state 'Created'
+/// overwrites if it exists already
+pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+ let mut job = Job::new(jobtype, jobname)?;
+ job.write_state()
+}
+
+/// Tries to update the state file with the current time
+/// if the job is currently running, does nothing.
+/// Intended for use when the schedule changes.
+pub fn update_job_last_run_time(jobtype: &str, jobname: &str) -> Result<(), Error> {
+ let mut job = match Job::new(jobtype, jobname) {
+ Ok(job) => job,
+ Err(_) => return Ok(()), // was locked (running), so do not update
+ };
+ let time = proxmox_time::epoch_i64();
+
+ job.state = match JobState::load(jobtype, jobname)? {
+ JobState::Created { .. } => JobState::Created { time },
+ JobState::Started { .. } => return Ok(()), // currently running (without lock?)
+ JobState::Finished {
+ upid,
+ state,
+ updated: _,
+ } => JobState::Finished {
+ upid,
+ state,
+ updated: Some(time),
+ },
+ };
+ job.write_state()
+}
+
+/// Returns the last run time of a job by reading the statefile
+/// Note that this is not locked
+pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
+ match JobState::load(jobtype, jobname)? {
+ JobState::Created { time } => Ok(time),
+ JobState::Finished {
+ updated: Some(time),
+ ..
+ } => Ok(time),
+ JobState::Started { upid }
+ | JobState::Finished {
+ upid,
+ state: _,
+ updated: None,
+ } => {
+ let upid: UPID = upid
+ .parse()
+ .map_err(|err| format_err!("could not parse upid from state: {err}"))?;
+ Ok(upid.starttime)
+ }
+ }
+}
+
+impl JobState {
+ /// Loads and deserializes the jobstate from type and name.
+ /// When the loaded state indicates a started UPID,
+ /// we go and check if it has already stopped, and
+ /// returning the correct state.
+ ///
+ /// This does not update the state in the file.
+ pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+ if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
+ match serde_json::from_str(&state)? {
+ JobState::Started { upid } => {
+ let parsed: UPID = upid
+ .parse()
+ .map_err(|err| format_err!("error parsing upid: {err}"))?;
+
+ if !worker_is_active_local(&parsed) {
+ let state = upid_read_status(&parsed).unwrap_or(TaskState::Unknown {
+ endtime: parsed.starttime,
+ });
+
+ Ok(JobState::Finished {
+ upid,
+ state,
+ updated: None,
+ })
+ } else {
+ Ok(JobState::Started { upid })
+ }
+ }
+ other => Ok(other),
+ }
+ } else {
+ Ok(JobState::Created {
+ time: proxmox_time::epoch_i64() - 30,
+ })
+ }
+ }
+}
+
+impl Job {
+ /// Creates a new instance of a job with the correct lock held
+ /// (will be hold until the job is dropped again).
+ ///
+ /// This does not load the state from the file, to do that,
+ /// 'load' must be called
+ pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+ let path = get_path(jobtype, jobname);
+
+ let _lock = get_lock(path)?;
+
+ Ok(Self {
+ jobtype: jobtype.to_string(),
+ jobname: jobname.to_string(),
+ state: JobState::Created {
+ time: proxmox_time::epoch_i64(),
+ },
+ _lock,
+ })
+ }
+
+ /// Start the job and update the statefile accordingly
+ /// Fails if the job was already started
+ pub fn start(&mut self, upid: &str) -> Result<(), Error> {
+ if let JobState::Started { .. } = self.state {
+ bail!("cannot start job that is started!");
+ }
+
+ self.state = JobState::Started {
+ upid: upid.to_string(),
+ };
+
+ self.write_state()
+ }
+
+ /// Finish the job and update the statefile accordingly with the given taskstate
+ /// Fails if the job was not yet started
+ pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
+ let upid = match &self.state {
+ JobState::Created { .. } => bail!("cannot finish when not started"),
+ JobState::Started { upid } => upid,
+ JobState::Finished { upid, .. } => upid,
+ }
+ .to_string();
+
+ self.state = JobState::Finished {
+ upid,
+ state,
+ updated: None,
+ };
+
+ self.write_state()
+ }
+
+ pub fn jobtype(&self) -> &str {
+ &self.jobtype
+ }
+
+ pub fn jobname(&self) -> &str {
+ &self.jobname
+ }
+
+ fn write_state(&mut self) -> Result<(), Error> {
+ let serialized = serde_json::to_string(&self.state)?;
+ let path = get_path(&self.jobtype, &self.jobname);
+ let options = proxmox_product_config::default_create_options();
+
+ replace_file(path, serialized.as_bytes(), options, false)
+ }
+}
+
+pub fn compute_schedule_status(
+ job_state: &JobState,
+ schedule: Option<&str>,
+) -> Result<JobScheduleStatus, Error> {
+ let (upid, endtime, state, last) = match job_state {
+ JobState::Created { time } => (None, None, None, *time),
+ JobState::Started { upid } => {
+ let parsed_upid: UPID = upid.parse()?;
+ (Some(upid), None, None, parsed_upid.starttime)
+ }
+ JobState::Finished {
+ upid,
+ state,
+ updated,
+ } => {
+ let last = updated.unwrap_or_else(|| state.endtime());
+ (
+ Some(upid),
+ Some(state.endtime()),
+ Some(state.to_string()),
+ last,
+ )
+ }
+ };
+
+ let mut status = JobScheduleStatus {
+ last_run_upid: upid.map(String::from),
+ last_run_state: state,
+ last_run_endtime: endtime,
+ ..Default::default()
+ };
+
+ if let Some(schedule) = schedule {
+ if let Ok(event) = schedule.parse::<CalendarEvent>() {
+ // ignore errors
+ status.next_run = event.compute_next_event(last).unwrap_or(None);
+ }
+ }
+
+ Ok(status)
+}
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 0f25aa71..bd8660a7 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -5,6 +5,7 @@ pub mod api;
pub mod auth;
pub mod context;
pub mod env;
+pub mod jobstate;
pub mod metric_collection;
pub mod parallel_fetcher;
pub mod remote_cache;
--
2.47.3
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 3+ messages in thread* [pdm-devel] [PATCH datacenter-manager 2/2] server: add task log, auth log and access log rotation
2025-11-27 15:37 [pdm-devel] [PATCH datacenter-manager 0/2] access/auth/task-log rotation Lukas Wagner
2025-11-27 15:37 ` [pdm-devel] [PATCH datacenter-manager 1/2] server: add jobstate module from PBS Lukas Wagner
@ 2025-11-27 15:37 ` Lukas Wagner
1 sibling, 0 replies; 3+ messages in thread
From: Lukas Wagner @ 2025-11-27 15:37 UTC (permalink / raw)
To: pdm-devel
This adds a periodic task that runs once a day at midnight. This task
will rotate the task log archive, the auth log and also the access log
file.
Copied from PBS and adapted as needed. We don't have a place yet to
configure the 'max_days' parameter for rotating the task log archive, so
this one is always set to `None` at the moment.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/bin/proxmox-datacenter-api/main.rs | 2 +-
.../proxmox-datacenter-api/tasks/logrotate.rs | 195 ++++++++++++++++++
.../bin/proxmox-datacenter-api/tasks/mod.rs | 2 +
.../bin/proxmox-datacenter-privileged-api.rs | 5 +-
4 files changed, 201 insertions(+), 3 deletions(-)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/logrotate.rs
diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index 0e80c829..7459e68f 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -426,9 +426,9 @@ async fn run_task_scheduler() {
async fn schedule_tasks() -> Result<(), Error> {
// TODO: move out to own module, refactor PBS stuff for reuse & then add:
- // - task log rotation
// - stats (rrd) collection
// - ...?
+ tasks::logrotate::schedule_task_log_rotate().await;
Ok(())
}
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/logrotate.rs b/server/src/bin/proxmox-datacenter-api/tasks/logrotate.rs
new file mode 100644
index 00000000..6afb069b
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/logrotate.rs
@@ -0,0 +1,195 @@
+use anyhow::{format_err, Error};
+
+use proxmox_lang::try_block;
+use proxmox_rest_server::WorkerTask;
+use proxmox_sys::logrotate::LogRotate;
+use proxmox_time::CalendarEvent;
+
+use pdm_api_types::Authid;
+use server::jobstate::{self, Job, JobState};
+
+/// Rotate task logs, auth logs and access logs.
+///
+/// This task runs every day at midnight, except when it has never run before, then it runs
+/// immediately.
+pub async fn schedule_task_log_rotate() {
+ let worker_type = "logrotate";
+ let job_id = "access-log_and_task-archive";
+
+ // schedule daily at 00:00 like normal logrotate
+ let schedule = "00:00";
+
+ if !check_schedule(worker_type, schedule, job_id) {
+ // if we never ran the rotation, schedule instantly
+ match JobState::load(worker_type, job_id) {
+ Ok(JobState::Created { .. }) => {}
+ _ => return,
+ }
+ }
+
+ let mut job = match Job::new(worker_type, job_id) {
+ Ok(job) => job,
+ Err(_) => return, // could not get lock
+ };
+
+ if let Err(err) = WorkerTask::new_thread(
+ worker_type,
+ None,
+ Authid::root_auth_id().to_string(),
+ false,
+ move |worker| {
+ job.start(&worker.upid().to_string())?;
+ proxmox_log::info!("starting task log rotation");
+
+ let result = try_block!({
+ let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file
+ let max_files = 20; // times twenty files gives > 100000 task entries
+
+ // TODO: Make this configurable
+ let max_days = None;
+
+ let options = proxmox_product_config::default_create_options();
+
+ let has_rotated = proxmox_rest_server::rotate_task_log_archive(
+ max_size,
+ true,
+ Some(max_files),
+ max_days,
+ Some(options),
+ )?;
+
+ if has_rotated {
+ log::info!("task log archive was rotated");
+ } else {
+ log::info!("task log archive was not rotated");
+ }
+
+ let max_size = 32 * 1024 * 1024 - 1;
+ let max_files = 14;
+
+ let mut logrotate = LogRotate::new(
+ pdm_buildcfg::API_ACCESS_LOG_FN,
+ true,
+ Some(max_files),
+ Some(options),
+ )?;
+
+ if logrotate.rotate(max_size)? {
+ println!("rotated access log, telling daemons to re-open log file");
+ proxmox_async::runtime::block_on(command_reopen_access_logfiles())?;
+ log::info!("API access log was rotated");
+ } else {
+ log::info!("API access log was not rotated");
+ }
+
+ let mut logrotate = LogRotate::new(
+ pdm_buildcfg::API_AUTH_LOG_FN,
+ true,
+ Some(max_files),
+ Some(options),
+ )?;
+
+ if logrotate.rotate(max_size)? {
+ println!("rotated auth log, telling daemons to re-open log file");
+ proxmox_async::runtime::block_on(command_reopen_auth_logfiles())?;
+ log::info!("API authentication log was rotated");
+ } else {
+ log::info!("API authentication log was not rotated");
+ }
+
+ if has_rotated {
+ log::info!("cleaning up old task logs");
+ if let Err(err) = proxmox_rest_server::cleanup_old_tasks(true) {
+ log::warn!("could not completely cleanup old tasks: {err}");
+ }
+ }
+
+ Ok(())
+ });
+
+ let status = worker.create_state(&result);
+
+ if let Err(err) = job.finish(status) {
+ eprintln!("could not finish job state for {worker_type}: {err}");
+ }
+
+ result
+ },
+ ) {
+ eprintln!("unable to start task log rotation: {err}");
+ }
+}
+
+async fn command_reopen_access_logfiles() -> Result<(), Error> {
+ // only care about the most recent daemon instance for each, proxy & api, as other older ones
+ // should not respond to new requests anyway, but only finish their current one and then exit.
+ let sock = proxmox_daemon::command_socket::this_path();
+ let f1 =
+ proxmox_daemon::command_socket::send_raw(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+
+ let pid = proxmox_rest_server::read_pid(pdm_buildcfg::PDM_API_PID_FN)?;
+ let sock = proxmox_daemon::command_socket::path_from_pid(pid);
+ let f2 =
+ proxmox_daemon::command_socket::send_raw(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+
+ match futures::join!(f1, f2) {
+ (Err(e1), Err(e2)) => Err(format_err!(
+ "reopen commands failed, proxy: {e1}; api: {e2}"
+ )),
+ (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {e1}")),
+ (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {e2}")),
+ _ => Ok(()),
+ }
+}
+
+async fn command_reopen_auth_logfiles() -> Result<(), Error> {
+ // only care about the most recent daemon instance for each, proxy & api, as other older ones
+ // should not respond to new requests anyway, but only finish their current one and then exit.
+ let sock = proxmox_daemon::command_socket::this_path();
+ let f1 =
+ proxmox_daemon::command_socket::send_raw(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
+
+ let pid = proxmox_rest_server::read_pid(pdm_buildcfg::PDM_API_PID_FN)?;
+ let sock = proxmox_daemon::command_socket::path_from_pid(pid);
+ let f2 =
+ proxmox_daemon::command_socket::send_raw(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
+
+ match futures::join!(f1, f2) {
+ (Err(e1), Err(e2)) => Err(format_err!(
+ "reopen commands failed, proxy: {e1}; api: {e2}"
+ )),
+ (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {e1}")),
+ (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {e2}")),
+ _ => Ok(()),
+ }
+}
+
+fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
+ let event: CalendarEvent = match event_str.parse() {
+ Ok(event) => event,
+ Err(err) => {
+ eprintln!("unable to parse schedule '{event_str}' - {err}");
+ return false;
+ }
+ };
+
+ let last = match jobstate::last_run_time(worker_type, id) {
+ Ok(time) => time,
+ Err(err) => {
+ eprintln!("could not get last run time of {worker_type} {id}: {err}");
+ return false;
+ }
+ };
+
+ let next = match event.compute_next_event(last) {
+ Ok(Some(next)) => next,
+ Ok(None) => return false,
+ Err(err) => {
+ eprintln!("compute_next_event for '{event_str}' failed - {err}");
+ return false;
+ }
+ };
+
+ let now = proxmox_time::epoch_i64();
+ next <= now
+}
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
index f4d1d3a1..b6c3a70c 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
@@ -1,3 +1,5 @@
+pub mod logrotate;
+
pub mod remote_node_mapping;
pub mod remote_tasks;
pub mod remote_updates;
diff --git a/server/src/bin/proxmox-datacenter-privileged-api.rs b/server/src/bin/proxmox-datacenter-privileged-api.rs
index 14ff4bf4..9b0a037b 100644
--- a/server/src/bin/proxmox-datacenter-privileged-api.rs
+++ b/server/src/bin/proxmox-datacenter-privileged-api.rs
@@ -30,10 +30,10 @@ fn main() -> Result<(), Error> {
.tasklog_pbs()
.init()?;
- create_directories()?;
-
proxmox_product_config::init(pdm_config::api_user()?, pdm_config::priv_user()?);
+ create_directories()?;
+
let mut args = std::env::args();
args.next();
for arg in args {
@@ -63,6 +63,7 @@ fn create_directories() -> Result<(), Error> {
let api_user = pdm_config::api_user()?;
pdm_config::setup::create_configdir()?;
+ server::jobstate::create_jobstate_dir()?;
pdm_config::setup::mkdir_perms(
pdm_buildcfg::PDM_RUN_DIR,
--
2.47.3
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 3+ messages in thread