From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 7AFA91FF17E for ; Thu, 27 Nov 2025 16:37:24 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 411407F15; Thu, 27 Nov 2025 16:37:45 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Date: Thu, 27 Nov 2025 16:37:07 +0100 Message-ID: <20251127153708.393210-2-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20251127153708.393210-1-l.wagner@proxmox.com> References: <20251127153708.393210-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1764257792329 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.032 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH datacenter-manager 1/2] server: add jobstate module from PBS X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" This is a general helper module that helps with keeping track of periodic tasks. Thus far we have not really needed it in PDM (although it could be useful for remote update fetching), but this changes with task log rotation which will be added next. The module was copied as is and then adapted where needed to make it work with PDM; some types and paths were changed. In the future this should be broken out and added to some shared crate (could be a new micro crate if no better place comes to mind), but given the time constraints a copy is good enough for now. This module will probably remain pretty unchanged in the near future, so the risk of diverging implementations should be pretty small. Signed-off-by: Lukas Wagner --- server/src/jobstate.rs | 299 +++++++++++++++++++++++++++++++++++++++++ server/src/lib.rs | 1 + 2 files changed, 300 insertions(+) create mode 100644 server/src/jobstate.rs diff --git a/server/src/jobstate.rs b/server/src/jobstate.rs new file mode 100644 index 00000000..c8e0d766 --- /dev/null +++ b/server/src/jobstate.rs @@ -0,0 +1,299 @@ +// FIXME: This was copied from PBS - move this to a shared crate and then make both +// use the same implementation. Maybe use this opportunity to +// - not use hard-coded paths +// - add tests +// - audit the public API in general and improve if needed + +use std::path::{Path, PathBuf}; + +use anyhow::{bail, format_err, Error}; +use nix::sys::stat::Mode; +use serde::{Deserialize, Serialize}; + +use proxmox_product_config::ApiLockGuard; +use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState}; +use proxmox_sys::fs::{create_path, file_read_optional_string, replace_file}; +use proxmox_time::CalendarEvent; + +use pbs_api_types::JobScheduleStatus; + +use pdm_api_types::UPID; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +/// Represents the State of a specific Job +pub enum JobState { + /// A job was created at 'time', but never started/finished + Created { time: i64 }, + /// The Job was last started in 'upid', + Started { upid: String }, + /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated' + Finished { + upid: String, + state: TaskState, + updated: Option, + }, +} + +/// Represents a Job and holds the correct lock +pub struct Job { + jobtype: String, + jobname: String, + /// The State of the job + pub state: JobState, + _lock: ApiLockGuard, +} + +const JOB_STATE_BASEDIR: &str = concat!(pdm_buildcfg::PDM_STATE_DIR_M!(), "/jobstates"); + +/// Create jobstate stat dir with correct permission +pub fn create_jobstate_dir() -> Result<(), Error> { + let mode = Mode::from_bits_truncate(0o0750); + let opts = proxmox_product_config::default_create_options().perm(mode); + + create_path(JOB_STATE_BASEDIR, Some(opts), Some(opts)) + .map_err(|err: Error| format_err!("unable to create job state dir - {err}"))?; + + Ok(()) +} + +fn get_path(jobtype: &str, jobname: &str) -> PathBuf { + let mut path = PathBuf::from(JOB_STATE_BASEDIR); + path.push(format!("{jobtype}-{jobname}.json")); + path +} + +fn get_lock

(path: P) -> Result +where + P: AsRef, +{ + let mut path = path.as_ref().to_path_buf(); + path.set_extension("lck"); + proxmox_product_config::open_api_lockfile(path, None, true) +} + +/// Removes the statefile of a job, this is useful if we delete a job +pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> { + let mut path = get_path(jobtype, jobname); + let _lock = get_lock(&path)?; + if let Err(err) = std::fs::remove_file(&path) { + if err.kind() != std::io::ErrorKind::NotFound { + bail!("cannot remove statefile for {jobtype} - {jobname}: {err}"); + } + } + path.set_extension("lck"); + if let Err(err) = std::fs::remove_file(&path) { + if err.kind() != std::io::ErrorKind::NotFound { + bail!("cannot remove lockfile for {jobtype} - {jobname}: {err}"); + } + } + Ok(()) +} + +/// Creates the statefile with the state 'Created' +/// overwrites if it exists already +pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> { + let mut job = Job::new(jobtype, jobname)?; + job.write_state() +} + +/// Tries to update the state file with the current time +/// if the job is currently running, does nothing. +/// Intended for use when the schedule changes. +pub fn update_job_last_run_time(jobtype: &str, jobname: &str) -> Result<(), Error> { + let mut job = match Job::new(jobtype, jobname) { + Ok(job) => job, + Err(_) => return Ok(()), // was locked (running), so do not update + }; + let time = proxmox_time::epoch_i64(); + + job.state = match JobState::load(jobtype, jobname)? { + JobState::Created { .. } => JobState::Created { time }, + JobState::Started { .. } => return Ok(()), // currently running (without lock?) + JobState::Finished { + upid, + state, + updated: _, + } => JobState::Finished { + upid, + state, + updated: Some(time), + }, + }; + job.write_state() +} + +/// Returns the last run time of a job by reading the statefile +/// Note that this is not locked +pub fn last_run_time(jobtype: &str, jobname: &str) -> Result { + match JobState::load(jobtype, jobname)? { + JobState::Created { time } => Ok(time), + JobState::Finished { + updated: Some(time), + .. + } => Ok(time), + JobState::Started { upid } + | JobState::Finished { + upid, + state: _, + updated: None, + } => { + let upid: UPID = upid + .parse() + .map_err(|err| format_err!("could not parse upid from state: {err}"))?; + Ok(upid.starttime) + } + } +} + +impl JobState { + /// Loads and deserializes the jobstate from type and name. + /// When the loaded state indicates a started UPID, + /// we go and check if it has already stopped, and + /// returning the correct state. + /// + /// This does not update the state in the file. + pub fn load(jobtype: &str, jobname: &str) -> Result { + if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? { + match serde_json::from_str(&state)? { + JobState::Started { upid } => { + let parsed: UPID = upid + .parse() + .map_err(|err| format_err!("error parsing upid: {err}"))?; + + if !worker_is_active_local(&parsed) { + let state = upid_read_status(&parsed).unwrap_or(TaskState::Unknown { + endtime: parsed.starttime, + }); + + Ok(JobState::Finished { + upid, + state, + updated: None, + }) + } else { + Ok(JobState::Started { upid }) + } + } + other => Ok(other), + } + } else { + Ok(JobState::Created { + time: proxmox_time::epoch_i64() - 30, + }) + } + } +} + +impl Job { + /// Creates a new instance of a job with the correct lock held + /// (will be hold until the job is dropped again). + /// + /// This does not load the state from the file, to do that, + /// 'load' must be called + pub fn new(jobtype: &str, jobname: &str) -> Result { + let path = get_path(jobtype, jobname); + + let _lock = get_lock(path)?; + + Ok(Self { + jobtype: jobtype.to_string(), + jobname: jobname.to_string(), + state: JobState::Created { + time: proxmox_time::epoch_i64(), + }, + _lock, + }) + } + + /// Start the job and update the statefile accordingly + /// Fails if the job was already started + pub fn start(&mut self, upid: &str) -> Result<(), Error> { + if let JobState::Started { .. } = self.state { + bail!("cannot start job that is started!"); + } + + self.state = JobState::Started { + upid: upid.to_string(), + }; + + self.write_state() + } + + /// Finish the job and update the statefile accordingly with the given taskstate + /// Fails if the job was not yet started + pub fn finish(&mut self, state: TaskState) -> Result<(), Error> { + let upid = match &self.state { + JobState::Created { .. } => bail!("cannot finish when not started"), + JobState::Started { upid } => upid, + JobState::Finished { upid, .. } => upid, + } + .to_string(); + + self.state = JobState::Finished { + upid, + state, + updated: None, + }; + + self.write_state() + } + + pub fn jobtype(&self) -> &str { + &self.jobtype + } + + pub fn jobname(&self) -> &str { + &self.jobname + } + + fn write_state(&mut self) -> Result<(), Error> { + let serialized = serde_json::to_string(&self.state)?; + let path = get_path(&self.jobtype, &self.jobname); + let options = proxmox_product_config::default_create_options(); + + replace_file(path, serialized.as_bytes(), options, false) + } +} + +pub fn compute_schedule_status( + job_state: &JobState, + schedule: Option<&str>, +) -> Result { + let (upid, endtime, state, last) = match job_state { + JobState::Created { time } => (None, None, None, *time), + JobState::Started { upid } => { + let parsed_upid: UPID = upid.parse()?; + (Some(upid), None, None, parsed_upid.starttime) + } + JobState::Finished { + upid, + state, + updated, + } => { + let last = updated.unwrap_or_else(|| state.endtime()); + ( + Some(upid), + Some(state.endtime()), + Some(state.to_string()), + last, + ) + } + }; + + let mut status = JobScheduleStatus { + last_run_upid: upid.map(String::from), + last_run_state: state, + last_run_endtime: endtime, + ..Default::default() + }; + + if let Some(schedule) = schedule { + if let Ok(event) = schedule.parse::() { + // ignore errors + status.next_run = event.compute_next_event(last).unwrap_or(None); + } + } + + Ok(status) +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 0f25aa71..bd8660a7 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -5,6 +5,7 @@ pub mod api; pub mod auth; pub mod context; pub mod env; +pub mod jobstate; pub mod metric_collection; pub mod parallel_fetcher; pub mod remote_cache; -- 2.47.3 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel